[
https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-20046:
----------------------------
Labels: test-stability (was: )
> StreamTableAggregateTests.test_map_view_iterate is instable
> -----------------------------------------------------------
>
> Key: FLINK-20046
> URL: https://issues.apache.org/jira/browse/FLINK-20046
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.12.0
> Reporter: Dian Fu
> Priority: Major
> Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=4fad9527-b9a5-5015-1b70-8356e5c91490
> {code}
> 2020-11-07T22:50:57.4180758Z _______________
> StreamTableAggregateTests.test_map_view_iterate ________________
> 2020-11-07T22:50:57.4181301Z
> 2020-11-07T22:50:57.4181965Z self =
> <pyflink.table.tests.test_aggregate.StreamTableAggregateTests
> testMethod=test_map_view_iterate>
> 2020-11-07T22:50:57.4182348Z
> 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self):
> 2020-11-07T22:50:57.4182812Z test_iterate =
> udaf(TestIterateAggregateFunction())
> 2020-11-07T22:50:57.4183320Z
> self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1))
> 2020-11-07T22:50:57.4183763Z
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size",
> "2")
> 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle.
> 2020-11-07T22:50:57.4308028Z
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2")
> 2020-11-07T22:50:57.4308945Z
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size",
> "2")
> 2020-11-07T22:50:57.4309676Z
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size",
> "2")
> 2020-11-07T22:50:57.4310701Z
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4311130Z
> "python.map-state.iterate-response-batch-size", "2")
> 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements(
> 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'),
> 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'),
> 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'),
> 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'),
> 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'),
> 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'),
> 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'),
> 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'),
> 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'),
> 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c'])
> 2020-11-07T22:50:57.4316023Z
> self.t_env.create_temporary_view("source", t)
> 2020-11-07T22:50:57.4316299Z table_with_retract_message =
> self.t_env.sql_query(
> 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b,
> LAST_VALUE(c) as c from source group by a")
> 2020-11-07T22:50:57.4316919Z result =
> table_with_retract_message.group_by(t.c) \
> 2020-11-07T22:50:57.4317197Z
> .select(test_iterate(t.b).alias("a"), t.c) \
> 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"),
> 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"),
> 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"),
> 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"),
> 2020-11-07T22:50:57.4318814Z t.c.alias("e"))
> 2020-11-07T22:50:57.4319023Z assert_frame_equal(
> 2020-11-07T22:50:57.4319208Z > result.to_pandas(),
> 2020-11-07T22:50:57.4319408Z pd.DataFrame([
> 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3",
> 'hello:3,hello2:1', 2, "hello"],
> 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3",
> "Hi:3,Hi2:2,Hi3:1", 3, "hi"]],
> 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd',
> 'e']))
> 2020-11-07T22:50:57.4321198Z
> 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468:
> 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas
> 2020-11-07T22:50:57.4322299Z .collectAsPandasDataFrame(self._j_table,
> max_arrow_batch_size)
> 2020-11-07T22:50:57.4322794Z
> .tox/py35-cython/lib/python3.5/site-packages/py4j/java_gateway.py:1286: in
> __call__
> 2020-11-07T22:50:57.4323103Z answer, self.gateway_client, self.target_id,
> self.name)
> 2020-11-07T22:50:57.4323351Z pyflink/util/exceptions.py:147: in deco
> 2020-11-07T22:50:57.4323537Z return f(*a, **kw)
> 2020-11-07T22:50:57.4323783Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> 2020-11-07T22:50:57.4323963Z
> 2020-11-07T22:50:57.4324225Z answer = 'xro8653'
> 2020-11-07T22:50:57.4324496Z gateway_client =
> <py4j.java_gateway.GatewayClient object at 0x7fe5c619db70>
> 2020-11-07T22:50:57.4324943Z target_id =
> 'z:org.apache.flink.table.runtime.arrow.ArrowUtils'
> 2020-11-07T22:50:57.4325312Z name = 'collectAsPandasDataFrame'
> 2020-11-07T22:50:57.4325439Z
> 2020-11-07T22:50:57.4325839Z def get_return_value(answer, gateway_client,
> target_id=None, name=None):
> 2020-11-07T22:50:57.4326420Z """Converts an answer received from the
> Java gateway into a Python object.
> 2020-11-07T22:50:57.4326648Z
> 2020-11-07T22:50:57.4326881Z For example, string representation of
> integers are converted to Python
> 2020-11-07T22:50:57.4327193Z integer, string representation of
> objects are converted to JavaObject
> 2020-11-07T22:50:57.4327451Z instances, etc.
> 2020-11-07T22:50:57.4327614Z
> 2020-11-07T22:50:57.4327819Z :param answer: the string returned by
> the Java gateway
> 2020-11-07T22:50:57.4328157Z :param gateway_client: the gateway
> client used to communicate with the Java
> 2020-11-07T22:50:57.4329738Z Gateway. Only necessary if the
> answer is a reference (e.g., object,
> 2020-11-07T22:50:57.4330018Z list, map)
> 2020-11-07T22:50:57.4330273Z :param target_id: the name of the object
> from which the answer comes from
> 2020-11-07T22:50:57.4330588Z (e.g., *object1* in
> `object1.hello()`). Optional.
> 2020-11-07T22:50:57.4330873Z :param name: the name of the member from
> which the answer comes from
> 2020-11-07T22:50:57.4331170Z (e.g., *hello* in
> `object1.hello()`). Optional.
> 2020-11-07T22:50:57.4331375Z """
> 2020-11-07T22:50:57.4331542Z if is_error(answer)[0]:
> 2020-11-07T22:50:57.4331761Z if len(answer) > 1:
> 2020-11-07T22:50:57.4331954Z type = answer[1]
> 2020-11-07T22:50:57.4332222Z value =
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 2020-11-07T22:50:57.4332531Z if answer[1] == REFERENCE_TYPE:
> 2020-11-07T22:50:57.4332757Z raise Py4JJavaError(
> 2020-11-07T22:50:57.4333016Z "An error occurred while
> calling {0}{1}{2}.\n".
> 2020-11-07T22:50:57.4333303Z > format(target_id, ".",
> name), value)
> 2020-11-07T22:50:57.4333700Z E py4j.protocol.Py4JJavaError:
> An error occurred while calling
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> 2020-11-07T22:50:57.4334558Z E :
> java.lang.RuntimeException: Could not remove element ',,,1,hi', should never
> happen.
> 2020-11-07T22:50:57.4335019Z E at
> org.apache.flink.table.runtime.arrow.ArrowUtils.filterOutRetractRows(ArrowUtils.java:708)
> 2020-11-07T22:50:57.4335479Z E at
> org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:635)
> 2020-11-07T22:50:57.4336238Z E at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-11-07T22:50:57.4336645Z E at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-11-07T22:50:57.4337099Z E at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-07T22:50:57.4337485Z E at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-07T22:50:57.4337911Z E at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 2020-11-07T22:50:57.4338410Z E at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 2020-11-07T22:50:57.4338859Z E at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> 2020-11-07T22:50:57.4339324Z E at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 2020-11-07T22:50:57.4339810Z E at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> 2020-11-07T22:50:57.4340260Z E at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> 2020-11-07T22:50:57.4340651Z E at
> java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)