[
https://issues.apache.org/jira/browse/FLINK-15928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Caizhi Weng closed FLINK-15928.
-------------------------------
Resolution: Cannot Reproduce
> Batch mode in blink planner caused IndexOutOfBoundsException error
> ------------------------------------------------------------------
>
> Key: FLINK-15928
> URL: https://issues.apache.org/jira/browse/FLINK-15928
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.9.2
> Reporter: Fanbin Bu
> Priority: Minor
> Labels: auto-deprioritized-major
>
> Flink version: 1.9.2
> mode: Batch mode, running on EMR with YARN
> The following is the details:
>
> table source sample:
> {code:scala}
> class SnowflakeTableSource(val schema: TableSchema,
> val parallelism: Int,
> val fetchSize: Int,
> val query: String,
> val options: SnowflakeOptions
> )
> extends StreamTableSource[Row] {
> override def getDataStream(execEnv: StreamExecutionEnvironment):
> SingleOutputStreamOperator[Row] = {
> execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
> }
> override def getReturnType: TypeInformation[Row] = schema.toRowType
> override def getTableSchema: TableSchema = schema
> override def isBounded: Boolean = true
> private def getInputFormat: JDBCInputFormat = {
> JDBCInputFormat.buildJDBCInputFormat
> .setDrivername(options.driverName)
> .setDBUrl(options.dbUrl)
> .setUsername(options.username)
> .setPassword(options.password)
> .setQuery(query)
> .setRowTypeInfo(getInputRowTypeInfo)
> .setFetchSize(fetchSize)
> .setParametersProvider(new
> GenericParameterValuesProvider(buildQueryParams(parallelism)))
> .finish
> }
> }
> {code}
>
> Here is the sample setup code:
> {code:scala}
> val settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build()
> val tableEnv = TableEnvironment.create(settings)
> val configurations = tableEnv.getConfig.getConfiguration
> configurations.setString(
> TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key,
> s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")
> tableEnv.registerTableSource(tableName, tableSource)
> queryResult = tableEnv.sqlQuery(sql)
> tableEnv.execute()
> {code}
>
> Here is the sample SQL:
> {code:sql}
> select
> ip_address
> , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
> , sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
> , sum(case when name = 'signin_failure' then 1 else 0 end) as
> signin_failure_count_1m
> ...
> from events
> group by
> ip_address
> , hop(created_at, interval '30' second, interval '1' minute)
> {code}
>
> Here is the stacktrace:
> {code}
> java.lang.IndexOutOfBoundsException at
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at
> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at
> HashWinAggWithKeys$538.endInput(Unknown Source) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
> java.lang.Thread.run(Thread.java:748)
> {code}
>
> The fact that this same code works well with other sql and the stacktrace
> message suggests that this might be related to memory issue. And this only
> happens for blink planner in batch mode. I tried to use BatchTableEnvironment
> in old planner and it works.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)