[
https://issues.apache.org/jira/browse/FLINK-20066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228930#comment-17228930
]
Dian Fu commented on FLINK-20066:
---------------------------------
cc [~hxbks2ks]
> BatchPandasUDAFITTests.test_group_aggregate_with_aux_group unstable
> -------------------------------------------------------------------
>
> Key: FLINK-20066
> URL: https://issues.apache.org/jira/browse/FLINK-20066
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.12.0
> Reporter: Dian Fu
> Priority: Major
> Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9361&view=logs&j=bdd9ea51-4de2-506a-d4d9-f3930e4d2355&t=98717c4f-b888-5636-bb1e-db7aca25755e
> {code}
> 2020-11-09T23:41:41.8853547Z =================================== FAILURES
> ===================================
> 2020-11-09T23:41:41.8854000Z __________
> BatchPandasUDAFITTests.test_group_aggregate_with_aux_group __________
> 2020-11-09T23:41:41.8854324Z
> 2020-11-09T23:41:41.8854647Z self =
> <pyflink.table.tests.test_pandas_udaf.BatchPandasUDAFITTests
> testMethod=test_group_aggregate_with_aux_group>
> 2020-11-09T23:41:41.8854956Z
> 2020-11-09T23:41:41.8855205Z def
> test_group_aggregate_with_aux_group(self):
> 2020-11-09T23:41:41.8855521Z t = self.t_env.from_elements(
> 2020-11-09T23:41:41.8858372Z [(1, 2, 3), (3, 2, 3), (2, 1, 3),
> (1, 5, 4), (1, 8, 6), (2, 3, 4)],
> 2020-11-09T23:41:41.8858807Z DataTypes.ROW(
> 2020-11-09T23:41:41.8859091Z [DataTypes.FIELD("a",
> DataTypes.TINYINT()),
> 2020-11-09T23:41:41.8859453Z DataTypes.FIELD("b",
> DataTypes.SMALLINT()),
> 2020-11-09T23:41:41.8859783Z DataTypes.FIELD("c",
> DataTypes.INT())]))
> 2020-11-09T23:41:41.8860012Z
> 2020-11-09T23:41:41.8860255Z table_sink =
> source_sink_utils.TestAppendSink(
> 2020-11-09T23:41:41.8863051Z ['a', 'b', 'c', 'd'],
> 2020-11-09T23:41:41.8863523Z [DataTypes.TINYINT(),
> DataTypes.INT(), DataTypes.FLOAT(), DataTypes.INT()])
> 2020-11-09T23:41:41.8864048Z
> self.t_env.register_table_sink("Results", table_sink)
> 2020-11-09T23:41:41.8864715Z
> self.t_env.get_config().get_configuration().set_string('python.metric.enabled',
> 'true')
> 2020-11-09T23:41:41.8865161Z self.t_env.register_function("max_add",
> udaf(MaxAdd(),
> 2020-11-09T23:41:41.8865573Z
> result_type=DataTypes.INT(),
> 2020-11-09T23:41:41.8865999Z
> func_type="pandas"))
> 2020-11-09T23:41:41.8866426Z
> self.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
> 2020-11-09T23:41:41.8866759Z t.group_by("a") \
> 2020-11-09T23:41:41.8867052Z .select("a, a + 1 as b, a + 2 as c")
> \
> 2020-11-09T23:41:41.8867352Z .group_by("a, b") \
> 2020-11-09T23:41:41.8867660Z .select("a, b, mean_udaf(b),
> max_add(b, c, 1)") \
> 2020-11-09T23:41:41.8868026Z > .execute_insert("Results") \
> 2020-11-09T23:41:41.8868293Z .wait()
> 2020-11-09T23:41:41.8868446Z
> 2020-11-09T23:41:41.8868704Z pyflink/table/tests/test_pandas_udaf.py:95:
> 2020-11-09T23:41:41.8869077Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> 2020-11-09T23:41:41.8869464Z pyflink/table/table_result.py:76: in wait
> 2020-11-09T23:41:41.8870150Z get_method(self._j_table_result, "await")()
> 2020-11-09T23:41:41.8870850Z
> .tox/py37-cython/lib/python3.7/site-packages/py4j/java_gateway.py:1286: in
> __call__
> 2020-11-09T23:41:41.8871415Z answer, self.gateway_client, self.target_id,
> self.name)
> 2020-11-09T23:41:41.8871768Z pyflink/util/exceptions.py:147: in deco
> 2020-11-09T23:41:41.8872032Z return f(*a, **kw)
> 2020-11-09T23:41:41.8872378Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> 2020-11-09T23:41:41.8872629Z
> 2020-11-09T23:41:41.8872978Z answer = 'xro24238'
> 2020-11-09T23:41:41.8873296Z gateway_client =
> <py4j.java_gateway.GatewayClient object at 0x7f0be26f2358>
> 2020-11-09T23:41:41.8873792Z target_id = 'o24237', name = 'await'
> 2020-11-09T23:41:41.8874097Z
> 2020-11-09T23:41:41.8874433Z def get_return_value(answer, gateway_client,
> target_id=None, name=None):
> 2020-11-09T23:41:41.8874893Z """Converts an answer received from the
> Java gateway into a Python object.
> 2020-11-09T23:41:41.8875212Z
> 2020-11-09T23:41:41.8875515Z For example, string representation of
> integers are converted to Python
> 2020-11-09T23:41:41.8875922Z integer, string representation of
> objects are converted to JavaObject
> 2020-11-09T23:41:41.8876255Z instances, etc.
> 2020-11-09T23:41:41.8876584Z
> 2020-11-09T23:41:41.8876873Z :param answer: the string returned by
> the Java gateway
> 2020-11-09T23:41:41.8877307Z :param gateway_client: the gateway
> client used to communicate with the Java
> 2020-11-09T23:41:41.8877749Z Gateway. Only necessary if the
> answer is a reference (e.g., object,
> 2020-11-09T23:41:41.8878126Z list, map)
> 2020-11-09T23:41:41.8878646Z :param target_id: the name of the object
> from which the answer comes from
> 2020-11-09T23:41:41.8879154Z (e.g., *object1* in
> `object1.hello()`). Optional.
> 2020-11-09T23:41:41.8879679Z :param name: the name of the member from
> which the answer comes from
> 2020-11-09T23:41:41.8880097Z (e.g., *hello* in
> `object1.hello()`). Optional.
> 2020-11-09T23:41:41.8880371Z """
> 2020-11-09T23:41:41.8880621Z if is_error(answer)[0]:
> 2020-11-09T23:41:41.8881088Z if len(answer) > 1:
> 2020-11-09T23:41:41.8881457Z type = answer[1]
> 2020-11-09T23:41:41.8881967Z value =
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 2020-11-09T23:41:41.8882417Z if answer[1] == REFERENCE_TYPE:
> 2020-11-09T23:41:41.8882754Z raise Py4JJavaError(
> 2020-11-09T23:41:41.8883113Z "An error occurred while
> calling {0}{1}{2}.\n".
> 2020-11-09T23:41:41.8883490Z > format(target_id, ".",
> name), value)
> 2020-11-09T23:41:41.8883935Z E py4j.protocol.Py4JJavaError:
> An error occurred while calling o24237.await.
> 2020-11-09T23:41:41.8884552Z E :
> java.util.concurrent.ExecutionException:
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2020-11-09T23:41:41.8885193Z E at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T23:41:41.8885796Z E at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T23:41:41.8886595Z E at
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
> 2020-11-09T23:41:41.8887337Z E at
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
> 2020-11-09T23:41:41.8887976Z E at
> sun.reflect.GeneratedMethodAccessor225.invoke(Unknown Source)
> 2020-11-09T23:41:41.8888520Z E at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-09T23:41:41.8889053Z E at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-09T23:41:41.8889688Z E at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 2020-11-09T23:41:41.8890493Z E at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 2020-11-09T23:41:41.8891500Z E at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> 2020-11-09T23:41:41.8892134Z E at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 2020-11-09T23:41:41.8892797Z E at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> 2020-11-09T23:41:41.8893443Z E at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> 2020-11-09T23:41:41.8893959Z E at
> java.lang.Thread.run(Thread.java:748)
> 2020-11-09T23:41:41.8894452Z E Caused by:
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2020-11-09T23:41:41.8895065Z E at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:59)
> 2020-11-09T23:41:41.8895748Z E at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
> 2020-11-09T23:41:41.8896497Z E at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:368)
> 2020-11-09T23:41:41.8897328Z E at
> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:107)
> 2020-11-09T23:41:41.8897954Z E at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
> 2020-11-09T23:41:41.8898554Z E at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-11-09T23:41:41.8899153Z E at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-11-09T23:41:41.8899678Z E ... 1 more
> 2020-11-09T23:41:41.8900197Z E Caused by:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-11-09T23:41:41.8900946Z E at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T23:41:41.8901597Z E at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T23:41:41.8902192Z E at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:57)
> 2020-11-09T23:41:41.8902668Z E ... 7 more
> 2020-11-09T23:41:41.8903107Z E Caused by:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-11-09T23:41:41.8903704Z E at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-11-09T23:41:41.8904393Z E at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> 2020-11-09T23:41:41.8905066Z E at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2020-11-09T23:41:41.8905647Z E at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2020-11-09T23:41:41.8906253Z E at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2020-11-09T23:41:41.8906842Z E at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2020-11-09T23:41:41.8907475Z E at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
> 2020-11-09T23:41:41.8908136Z E at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2020-11-09T23:41:41.8908757Z E at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2020-11-09T23:41:41.8909434Z E at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2020-11-09T23:41:41.8910095Z E at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2020-11-09T23:41:41.8910691Z E at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
> 2020-11-09T23:41:41.8911343Z E at
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2020-11-09T23:41:41.8911826Z E at
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2020-11-09T23:41:41.8912326Z E at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2020-11-09T23:41:41.8912822Z E at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2020-11-09T23:41:41.8913336Z E at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2020-11-09T23:41:41.8914205Z E at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> 2020-11-09T23:41:41.8914918Z E at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 2020-11-09T23:41:41.8915503Z E at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 2020-11-09T23:41:41.8916158Z E at
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 2020-11-09T23:41:41.8916723Z E at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2020-11-09T23:41:41.8917388Z E at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2020-11-09T23:41:41.8917992Z E at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2020-11-09T23:41:41.8918539Z E at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2020-11-09T23:41:41.8919041Z E at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2020-11-09T23:41:41.8919696Z E at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2020-11-09T23:41:41.8920328Z E at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 2020-11-09T23:41:41.8921129Z E at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2020-11-09T23:41:41.8921790Z E at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2020-11-09T23:41:41.8922396Z E at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 2020-11-09T23:41:41.8922951Z E at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 2020-11-09T23:41:41.8923502Z E at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 2020-11-09T23:41:41.8924115Z E at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 2020-11-09T23:41:41.8924715Z E at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2020-11-09T23:41:41.8925275Z E at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2020-11-09T23:41:41.8925845Z E at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2020-11-09T23:41:41.8926402Z E at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-11-09T23:41:41.8927017Z E Caused by:
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> 2020-11-09T23:41:41.8927737Z E at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> 2020-11-09T23:41:41.8928609Z E at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> 2020-11-09T23:41:41.8929366Z E at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
> 2020-11-09T23:41:41.8930124Z E at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
> 2020-11-09T23:41:41.8930893Z E at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
> 2020-11-09T23:41:41.8931631Z E at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
> 2020-11-09T23:41:41.8932264Z E at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> 2020-11-09T23:41:41.8932883Z E at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
> 2020-11-09T23:41:41.8933389Z E at
> sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
> 2020-11-09T23:41:41.8933910Z E at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-09T23:41:41.8934575Z E at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-09T23:41:41.8935103Z E at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
> 2020-11-09T23:41:41.8935719Z E at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
> 2020-11-09T23:41:41.8936349Z E at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 2020-11-09T23:41:41.8936952Z E at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> 2020-11-09T23:41:41.8937525Z E at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 2020-11-09T23:41:41.8938053Z E at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 2020-11-09T23:41:41.8938571Z E at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 2020-11-09T23:41:41.8939116Z E at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 2020-11-09T23:41:41.8939747Z E at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 2020-11-09T23:41:41.8940259Z E at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2020-11-09T23:41:41.8940844Z E at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2020-11-09T23:41:41.8941682Z E at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 2020-11-09T23:41:41.8942198Z E at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 2020-11-09T23:41:41.8942739Z E at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 2020-11-09T23:41:41.8943237Z E at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 2020-11-09T23:41:41.8943713Z E at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 2020-11-09T23:41:41.8944207Z E at
> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 2020-11-09T23:41:41.8944660Z E at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 2020-11-09T23:41:41.8945007Z E ... 4 more
> 2020-11-09T23:41:41.8945402Z E Caused by:
> java.lang.RuntimeException: Failed to close remote bundle
> 2020-11-09T23:41:41.8946046Z E at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
> 2020-11-09T23:41:41.8946794Z E at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
> 2020-11-09T23:41:41.8947691Z E at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
> 2020-11-09T23:41:41.8948546Z E at
> org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.endInput(AbstractOneInputPythonFunctionOperator.java:42)
> 2020-11-09T23:41:41.8949613Z E at
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:95)
> 2020-11-09T23:41:41.8950643Z E at
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
> 2020-11-09T23:41:41.8951667Z E at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
> 2020-11-09T23:41:41.8952406Z E at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
> 2020-11-09T23:41:41.8953139Z E at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> 2020-11-09T23:41:41.8954086Z E at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
> 2020-11-09T23:41:41.8954847Z E at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> 2020-11-09T23:41:41.8955495Z E at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
> 2020-11-09T23:41:41.8956082Z E at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
> 2020-11-09T23:41:41.8956660Z E at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
> 2020-11-09T23:41:41.8957218Z E at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> 2020-11-09T23:41:41.8957747Z E at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> 2020-11-09T23:41:41.8958311Z E at
> java.lang.Thread.run(Thread.java:748)
> 2020-11-09T23:41:41.8959085Z E Caused by:
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
> received from SDK harness for instruction 2: Traceback (most recent call
> last):
> 2020-11-09T23:41:41.8960839Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> 2020-11-09T23:41:41.8961406Z E response = task()
> 2020-11-09T23:41:41.8962160Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda>
> 2020-11-09T23:41:41.8962733Z E lambda:
> self.create_worker().do_instruction(request), request)
> 2020-11-09T23:41:41.8963624Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> 2020-11-09T23:41:41.8964156Z E getattr(request,
> request_type), request.instruction_id)
> 2020-11-09T23:41:41.8964947Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
> 2020-11-09T23:41:41.8965477Z E
> bundle_processor.process_bundle(instruction_id))
> 2020-11-09T23:41:41.8966276Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
> 2020-11-09T23:41:41.8966767Z E element.data)
> 2020-11-09T23:41:41.8967636Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
> 2020-11-09T23:41:41.8968489Z E
> self.output(decoded_value)
> 2020-11-09T23:41:41.8968975Z E File
> "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8969676Z E File
> "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8970276Z E File
> "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> 2020-11-09T23:41:41.8971101Z E File
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8971649Z E with
> self.scoped_process_state:
> 2020-11-09T23:41:41.8972182Z E File
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 75, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8972711Z E self.func(value),
> self.consumer.output_stream, True)
> 2020-11-09T23:41:41.8973182Z E File "<string>", line 1,
> in <lambda>
> 2020-11-09T23:41:41.8973840Z E File
> "/__w/2/s/flink-python/pyflink/table/udf.py", line 269, in eval
> 2020-11-09T23:41:41.8974254Z E return
> self.func.get_value(accumulator)
> 2020-11-09T23:41:41.8974892Z E File
> "/__w/2/s/flink-python/pyflink/table/tests/test_pandas_udaf.py", line 709, in
> get_value
> 2020-11-09T23:41:41.8975341Z E
> self.assertEqual(self.counter_sum, self.counter.get_count())
> 2020-11-09T23:41:41.8976040Z E File
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 839,
> in assertEqual
> 2020-11-09T23:41:41.8976492Z E assertion_func(first,
> second, msg=msg)
> 2020-11-09T23:41:41.8977147Z E File
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 832,
> in _baseAssertEqual
> 2020-11-09T23:41:41.8977616Z E raise
> self.failureException(msg)
> 2020-11-09T23:41:41.8977964Z E AssertionError: 30 != 10
> 2020-11-09T23:41:41.8978238Z E
> 2020-11-09T23:41:41.8978633Z E at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T23:41:41.8979293Z E at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T23:41:41.8979986Z E at
> org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> 2020-11-09T23:41:41.8980619Z E at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
> 2020-11-09T23:41:41.8981558Z E at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
> 2020-11-09T23:41:41.8982319Z E at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
> 2020-11-09T23:41:41.8982840Z E ... 16 more
> 2020-11-09T23:41:41.8983356Z E Caused by:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 2: Traceback (most recent call last):
> 2020-11-09T23:41:41.8984329Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> 2020-11-09T23:41:41.8984816Z E response = task()
> 2020-11-09T23:41:41.8985619Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda>
> 2020-11-09T23:41:41.8986330Z E lambda:
> self.create_worker().do_instruction(request), request)
> 2020-11-09T23:41:41.8987155Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> 2020-11-09T23:41:41.8987713Z E getattr(request,
> request_type), request.instruction_id)
> 2020-11-09T23:41:41.8988500Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
> 2020-11-09T23:41:41.8989029Z E
> bundle_processor.process_bundle(instruction_id))
> 2020-11-09T23:41:41.8989883Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
> 2020-11-09T23:41:41.8990359Z E element.data)
> 2020-11-09T23:41:41.8991201Z E File
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
> 2020-11-09T23:41:41.8991707Z E
> self.output(decoded_value)
> 2020-11-09T23:41:41.8992425Z E File
> "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8993023Z E File
> "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8993656Z E File
> "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> 2020-11-09T23:41:41.8994447Z E File
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8995170Z E with
> self.scoped_process_state:
> 2020-11-09T23:41:41.8995838Z E File
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 75, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8996449Z E self.func(value),
> self.consumer.output_stream, True)
> 2020-11-09T23:41:41.8996844Z E File "<string>", line 1,
> in <lambda>
> 2020-11-09T23:41:41.8997535Z E File
> "/__w/2/s/flink-python/pyflink/table/udf.py", line 269, in eval
> 2020-11-09T23:41:41.8997963Z E return
> self.func.get_value(accumulator)
> 2020-11-09T23:41:41.8998633Z E File
> "/__w/2/s/flink-python/pyflink/table/tests/test_pandas_udaf.py", line 709, in
> get_value
> 2020-11-09T23:41:41.8999200Z E
> self.assertEqual(self.counter_sum, self.counter.get_count())
> 2020-11-09T23:41:41.9000317Z E File
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 839,
> in assertEqual
> 2020-11-09T23:41:41.9001165Z E assertion_func(first,
> second, msg=msg)
> 2020-11-09T23:41:41.9001865Z E File
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 832,
> in _baseAssertEqual
> 2020-11-09T23:41:41.9002396Z E raise
> self.failureException(msg)
> 2020-11-09T23:41:41.9002865Z E AssertionError: 30 != 10
> 2020-11-09T23:41:41.9003240Z E
> 2020-11-09T23:41:41.9003822Z E at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> 2020-11-09T23:41:41.9004735Z E at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> 2020-11-09T23:41:41.9005621Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
> 2020-11-09T23:41:41.9006612Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> 2020-11-09T23:41:41.9007519Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> 2020-11-09T23:41:41.9008389Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
> 2020-11-09T23:41:41.9009224Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
> 2020-11-09T23:41:41.9010178Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
> 2020-11-09T23:41:41.9011130Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 2020-11-09T23:41:41.9012068Z E at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> 2020-11-09T23:41:41.9012702Z E at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-11-09T23:41:41.9013380Z E at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-11-09T23:41:41.9013805Z E ... 1 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)