[ 
https://issues.apache.org/jira/browse/FLINK-15928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng closed FLINK-15928.
-------------------------------
    Resolution: Cannot Reproduce

> Batch mode in blink planner caused IndexOutOfBoundsException error
> ------------------------------------------------------------------
>
>                 Key: FLINK-15928
>                 URL: https://issues.apache.org/jira/browse/FLINK-15928
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.9.2
>            Reporter: Fanbin Bu
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> Flink version: 1.9.2
> mode: Batch mode, running on EMR with YARN
> The following is the details:
>  
> table source sample:
> {code:scala}
> class SnowflakeTableSource(val schema: TableSchema,
>  val parallelism: Int,
>  val fetchSize: Int,
>  val query: String,
>  val options: SnowflakeOptions
>  )
>  extends StreamTableSource[Row] {
> override def getDataStream(execEnv: StreamExecutionEnvironment): 
> SingleOutputStreamOperator[Row] = {
>  execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
> }
> override def getReturnType: TypeInformation[Row] = schema.toRowType
> override def getTableSchema: TableSchema = schema
> override def isBounded: Boolean = true
> private def getInputFormat: JDBCInputFormat = {
>  JDBCInputFormat.buildJDBCInputFormat
>  .setDrivername(options.driverName)
>  .setDBUrl(options.dbUrl)
>  .setUsername(options.username)
>  .setPassword(options.password)
>  .setQuery(query)
>  .setRowTypeInfo(getInputRowTypeInfo)
>  .setFetchSize(fetchSize)
>  .setParametersProvider(new 
> GenericParameterValuesProvider(buildQueryParams(parallelism)))
>  .finish
> }
> }
> {code}
>  
> Here is the sample setup code:
> {code:scala}
> val settings = EnvironmentSettings.newInstance()
>  .useBlinkPlanner()
>  .inBatchMode()
>  .build()
> val tableEnv = TableEnvironment.create(settings)
> val configurations = tableEnv.getConfig.getConfiguration
> configurations.setString(
>  TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key, 
> s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")
> tableEnv.registerTableSource(tableName, tableSource)
> queryResult = tableEnv.sqlQuery(sql)
> tableEnv.execute()
> {code}
>  
> Here is the sample SQL:
> {code:sql}
> select 
> ip_address
>  , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
>  , sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
> , sum(case when name = 'signin_failure' then 1 else 0 end) as 
> signin_failure_count_1m
> ...
> from events
> group by
> ip_address
>  , hop(created_at, interval '30' second, interval '1' minute)
> {code}
>  
> Here is the stacktrace:
> {code}
> java.lang.IndexOutOfBoundsException at 
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at 
> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at 
> HashWinAggWithKeys$538.endInput(Unknown Source) at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The fact that this same code works well with other sql and the stacktrace 
> message suggests that this might be related to memory issue. And this only 
> happens for blink planner in batch mode. I tried to use BatchTableEnvironment 
> in old planner and it works.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to