[
https://issues.apache.org/jira/browse/FLINK-15928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Caizhi Weng updated FLINK-15928:
--------------------------------
Description:
Flink version: 1.9.2
mode: Batch mode, running on EMR with YARN
The following is the details:
table source sample:
{code:scala}
class SnowflakeTableSource(val schema: TableSchema,
val parallelism: Int,
val fetchSize: Int,
val query: String,
val options: SnowflakeOptions
)
extends StreamTableSource[Row] {
override def getDataStream(execEnv: StreamExecutionEnvironment):
SingleOutputStreamOperator[Row] = {
execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
}
override def getReturnType: TypeInformation[Row] = schema.toRowType
override def getTableSchema: TableSchema = schema
override def isBounded: Boolean = true
private def getInputFormat: JDBCInputFormat = {
JDBCInputFormat.buildJDBCInputFormat
.setDrivername(options.driverName)
.setDBUrl(options.dbUrl)
.setUsername(options.username)
.setPassword(options.password)
.setQuery(query)
.setRowTypeInfo(getInputRowTypeInfo)
.setFetchSize(fetchSize)
.setParametersProvider(new
GenericParameterValuesProvider(buildQueryParams(parallelism)))
.finish
}
}
{code}
Here is the sample setup code:
{code:scala}
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val tableEnv = TableEnvironment.create(settings)
val configurations = tableEnv.getConfig.getConfiguration
configurations.setString(
TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key,
s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")
tableEnv.registerTableSource(tableName, tableSource)
queryResult = tableEnv.sqlQuery(sql)
tableEnv.execute()
{code}
Here is the sample SQL:
{code:sql}
select
ip_address
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
, sum(case when name = 'signin_failure' then 1 else 0 end) as
signin_failure_count_1m
...
from events
group by
ip_address
, hop(created_at, interval '30' second, interval '1' minute)
{code}
Here is the stacktrace:
{code}
java.lang.IndexOutOfBoundsException at
org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at
org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at
HashWinAggWithKeys$538.endInput(Unknown Source) at
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
java.lang.Thread.run(Thread.java:748)
{code}
The fact that this same code works well with other sql and the stacktrace
message suggests that this might be related to memory issue. And this only
happens for blink planner in batch mode. I tried to use BatchTableEnvironment
in old planner and it works.
was:
Flink version: 1.9.2
mode: Batch mode, running on EMR with YARN
The following is the details:
table source sample:
class SnowflakeTableSource(val schema: TableSchema,
val parallelism: Int,
val fetchSize: Int,
val query: String,
val options: SnowflakeOptions
)
extends StreamTableSource[Row] {
override def getDataStream(execEnv: StreamExecutionEnvironment):
SingleOutputStreamOperator[Row] = {
execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
}
override def getReturnType: TypeInformation[Row] = schema.toRowType
override def getTableSchema: TableSchema = schema
override def isBounded: Boolean = true
private def getInputFormat: JDBCInputFormat = {
JDBCInputFormat.buildJDBCInputFormat
.setDrivername(options.driverName)
.setDBUrl(options.dbUrl)
.setUsername(options.username)
.setPassword(options.password)
.setQuery(query)
.setRowTypeInfo(getInputRowTypeInfo)
.setFetchSize(fetchSize)
.setParametersProvider(new
GenericParameterValuesProvider(buildQueryParams(parallelism)))
.finish
}
}
Here is the sample setup code:
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val tableEnv = TableEnvironment.create(settings)
val configurations = tableEnv.getConfig.getConfiguration
configurations.setString(
TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key,
s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")
tableEnv.registerTableSource(tableName, tableSource)
queryResult = tableEnv.sqlQuery(sql)
tableEnv.execute()
Here is the sample SQL:
select
ip_address
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
, sum(case when name = 'signin_failure' then 1 else 0 end) as
signin_failure_count_1m
...
from events
group by
ip_address
, hop(created_at, interval '30' second, interval '1' minute)
Here is the stacktrace:
java.lang.IndexOutOfBoundsException at
org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at
org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at
HashWinAggWithKeys$538.endInput(Unknown Source) at
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
java.lang.Thread.run(Thread.java:748)
The fact that this same code works well with other sql and the stacktrace
message suggests that this might be related to memory issue. And this only
happens for blink planner in batch mode. I tried to use BatchTableEnvironment
in old planner and it works.
> Batch mode in blink planner caused IndexOutOfBoundsException error
> ------------------------------------------------------------------
>
> Key: FLINK-15928
> URL: https://issues.apache.org/jira/browse/FLINK-15928
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.9.2
> Reporter: Fanbin Bu
> Priority: Minor
> Labels: auto-deprioritized-major
>
> Flink version: 1.9.2
> mode: Batch mode, running on EMR with YARN
> The following is the details:
>
> table source sample:
> {code:scala}
> class SnowflakeTableSource(val schema: TableSchema,
> val parallelism: Int,
> val fetchSize: Int,
> val query: String,
> val options: SnowflakeOptions
> )
> extends StreamTableSource[Row] {
> override def getDataStream(execEnv: StreamExecutionEnvironment):
> SingleOutputStreamOperator[Row] = {
> execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
> }
> override def getReturnType: TypeInformation[Row] = schema.toRowType
> override def getTableSchema: TableSchema = schema
> override def isBounded: Boolean = true
> private def getInputFormat: JDBCInputFormat = {
> JDBCInputFormat.buildJDBCInputFormat
> .setDrivername(options.driverName)
> .setDBUrl(options.dbUrl)
> .setUsername(options.username)
> .setPassword(options.password)
> .setQuery(query)
> .setRowTypeInfo(getInputRowTypeInfo)
> .setFetchSize(fetchSize)
> .setParametersProvider(new
> GenericParameterValuesProvider(buildQueryParams(parallelism)))
> .finish
> }
> }
> {code}
>
> Here is the sample setup code:
> {code:scala}
> val settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build()
> val tableEnv = TableEnvironment.create(settings)
> val configurations = tableEnv.getConfig.getConfiguration
> configurations.setString(
> TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key,
> s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")
> tableEnv.registerTableSource(tableName, tableSource)
> queryResult = tableEnv.sqlQuery(sql)
> tableEnv.execute()
> {code}
>
> Here is the sample SQL:
> {code:sql}
> select
> ip_address
> , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
> , sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
> , sum(case when name = 'signin_failure' then 1 else 0 end) as
> signin_failure_count_1m
> ...
> from events
> group by
> ip_address
> , hop(created_at, interval '30' second, interval '1' minute)
> {code}
>
> Here is the stacktrace:
> {code}
> java.lang.IndexOutOfBoundsException at
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at
> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at
> HashWinAggWithKeys$538.endInput(Unknown Source) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
> java.lang.Thread.run(Thread.java:748)
> {code}
>
> The fact that this same code works well with other sql and the stacktrace
> message suggests that this might be related to memory issue. And this only
> happens for blink planner in batch mode. I tried to use BatchTableEnvironment
> in old planner and it works.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)