[
https://issues.apache.org/jira/browse/FLINK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fangliang Liu updated FLINK-24961:
----------------------------------
Description:
DDL
{code:java}
CREATE TABLE if not exists table_a (
`user_id` BIGINT NULL COMMENT '',
`id` BIGINT NULL COMMENT '',
`position_id` BIGINT NULL COMMENT '',
`status` STRING NULL COMMENT '',
`transaction_id` BIGINT NULL COMMENT '',
PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
) WITH(
'connector'='kafka',
'topic'='xxxx',
'properties.bootstrap.servers'='xxx',
'properties.group.id'='xxx',
'properties.auto.offset.reset'='earliest',
'scan.startup.mode'='earliest-offset',
'format'='debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url'='xxxx'
);
CREATE TABLE if not exists table_b (
`user_id` BIGINT NULL COMMENT '',
`id` BIGINT NULL COMMENT '',
`position_id` BIGINT NULL COMMENT '',
`status` STRING NULL COMMENT '',
`transaction_id` BIGINT NULL COMMENT '',
) WITH (
'connector' = 'tidb',
'tidb.database.url' = 'jdbc:mysql://xxxx',
'tidb.username' = 'xxxx',
'tidb.password' = 'xxxxx',
'tidb.database.name' = 'xxxxx',
'tidb.maximum.pool.size' = '1',
'tidb.minimum.idle.size' = '1',
'tidb.table.name' = 'withdraws',
'tidb.write_mode' = 'upsert',
'sink.buffer-flush.max-rows' = '0'
);
insert into table_b select * from table_a;
{code}
The actual schema in tidb has one more auto-increment column than table_b, and
the following error is reported when the task is started
{code:java}
2021-11-19 07:55:36,985 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
TableSourceScan(table=[[default_catalog, kafka_rt_ods_bybitprod, withdraws]],
fields=[user_id, id, position_id, coin, status, transaction_id, amount, fee,
address, admin_id, reason, confirm_code, txid, submited_at, confirmed_at,
verified_at, packed_at, broadcasted_at, successed_at, canceled_at, rejected_at,
expired_at, destination_tag, updated_at, risk_tags, risk_level, risk_status,
first_review_result, first_review_admin_id, first_review_desc, first_review_at,
final_review_result]) -> DropUpdateBefore -> Sink:
Sink(table=[default_catalog.tidb_rt_ods_bybitprod.withdraws], fields=[user_id,
id, position_id, coin, status, transaction_id, amount, fee, address, admin_id,
reason, confirm_code, txid, submited_at, confirmed_at, verified_at, packed_at,
broadcasted_at, successed_at, canceled_at, rejected_at, expired_at,
destination_tag, updated_at, risk_tags, risk_level, risk_status,
first_review_result, first_review_admin_id, first_review_desc, first_review_at,
final_review_result]) (1/1) (238d9e5c8a275d7427fa87d908cda1a3) switched from
INITIALIZING to FAILED on container_e14_1627389692587_137379_01_000002 @
ip-10-60-53-37.ap-southeast-1.compute.internal (dataPort=41325).
java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
~[?:1.8.0_291]
at
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
~[?:1.8.0_291]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
~[?:1.8.0_291]
at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_291]
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
~[?:1.8.0_291]
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
~[?:1.8.0_291]
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
~[?:1.8.0_291]
at
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
~[?:1.8.0_291]
at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
~[flink-tidb-connector-1.13-0.0.4.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
2021-11-19 07:55:36,996 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job 25e6800fc67392651c32db54b2fcc483 {code}
was:[] - Source: TableSourceScan(table=[[default_catalog,
kafka_rt_ods_bybitprod, withdraws]], fields=[user_id, id, position_id, coin,
status, transaction_id, amount, fee, address, admin_id, reason, confirm_code,
txid, submited_at, confirmed_at, verified_at, packed_at, broadcasted_at,
successed_at, canceled_at, rejected_at, expired_at, destination_tag,
updated_at, risk_tags, risk_level, risk_status, first_review_result,
first_review_admin_id, first_review_desc, first_review_at,
final_review_result]) -> DropUpdateBefore -> Sink:
Sink(table=[default_catalog.tidb_rt_ods_bybitprod.withdraws], fields=[user_id,
id, position_id, coin, status, transaction_id, amount, fee, address, admin_id,
reason, confirm_code, txid, submited_at, confirmed_at, verified_at, packed_at,
broadcasted_at, successed_at, canceled_at, rejected_at, expired_at,
destination_tag, updated_at, risk_tags, risk_level, risk_status,
first_review_result, first_review_admin_id, first_review_desc, first_review_at,
final_review_result]) (1/1) (238d9e5c8a275d7427fa87d908cda1a3) switched from
INITIALIZING to FAILED on container_e14_1627389692587_137379_01_000002 @
ip-10-60-53-37.ap-southeast-1.compute.internal (dataPort=41325).
java.lang.ArrayIndexOutOfBoundsException: -1 at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2] at
java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_291] at
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
~[?:1.8.0_291] at
java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
~[?:1.8.0_291] at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_291] at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
~[?:1.8.0_291] at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
~[?:1.8.0_291] at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
~[?:1.8.0_291] at
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
~[?:1.8.0_291] at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
~[flink-tidb-connector-1.13-0.0.4.jar:?] at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
~[flink-table-blink_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.lang.Thread.run(Thread.java:748)
~[?:1.8.0_291]
> When the DDL statement is different from the actual schema in the database,
> ArrayIndexOutOfBoundsException will be reported
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24961
> URL: https://issues.apache.org/jira/browse/FLINK-24961
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.13.2
> Reporter: Fangliang Liu
> Priority: Major
>
> DDL
> {code:java}
> CREATE TABLE if not exists table_a (
> `user_id` BIGINT NULL COMMENT '',
> `id` BIGINT NULL COMMENT '',
> `position_id` BIGINT NULL COMMENT '',
> `status` STRING NULL COMMENT '',
> `transaction_id` BIGINT NULL COMMENT '',
> PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
> ) WITH(
> 'connector'='kafka',
> 'topic'='xxxx',
> 'properties.bootstrap.servers'='xxx',
> 'properties.group.id'='xxx',
> 'properties.auto.offset.reset'='earliest',
> 'scan.startup.mode'='earliest-offset',
> 'format'='debezium-avro-confluent',
> 'debezium-avro-confluent.schema-registry.url'='xxxx'
> );
> CREATE TABLE if not exists table_b (
> `user_id` BIGINT NULL COMMENT '',
> `id` BIGINT NULL COMMENT '',
> `position_id` BIGINT NULL COMMENT '',
> `status` STRING NULL COMMENT '',
> `transaction_id` BIGINT NULL COMMENT '',
> ) WITH (
> 'connector' = 'tidb',
> 'tidb.database.url' = 'jdbc:mysql://xxxx',
> 'tidb.username' = 'xxxx',
> 'tidb.password' = 'xxxxx',
> 'tidb.database.name' = 'xxxxx',
> 'tidb.maximum.pool.size' = '1',
> 'tidb.minimum.idle.size' = '1',
> 'tidb.table.name' = 'withdraws',
> 'tidb.write_mode' = 'upsert',
> 'sink.buffer-flush.max-rows' = '0'
> );
> insert into table_b select * from table_a;
> {code}
> The actual schema in tidb has one more auto-increment column than table_b,
> and the following error is reported when the task is started
>
> {code:java}
> 2021-11-19 07:55:36,985 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> TableSourceScan(table=[[default_catalog, kafka_rt_ods_bybitprod, withdraws]],
> fields=[user_id, id, position_id, coin, status, transaction_id, amount, fee,
> address, admin_id, reason, confirm_code, txid, submited_at, confirmed_at,
> verified_at, packed_at, broadcasted_at, successed_at, canceled_at,
> rejected_at, expired_at, destination_tag, updated_at, risk_tags, risk_level,
> risk_status, first_review_result, first_review_admin_id, first_review_desc,
> first_review_at, final_review_result]) -> DropUpdateBefore -> Sink:
> Sink(table=[default_catalog.tidb_rt_ods_bybitprod.withdraws],
> fields=[user_id, id, position_id, coin, status, transaction_id, amount, fee,
> address, admin_id, reason, confirm_code, txid, submited_at, confirmed_at,
> verified_at, packed_at, broadcasted_at, successed_at, canceled_at,
> rejected_at, expired_at, destination_tag, updated_at, risk_tags, risk_level,
> risk_status, first_review_result, first_review_admin_id, first_review_desc,
> first_review_at, final_review_result]) (1/1)
> (238d9e5c8a275d7427fa87d908cda1a3) switched from INITIALIZING to FAILED on
> container_e14_1627389692587_137379_01_000002 @
> ip-10-60-53-37.ap-southeast-1.compute.internal (dataPort=41325).
> java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145)
> ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
> at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
> ~[?:1.8.0_291]
> at
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
> ~[?:1.8.0_291]
> at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
> ~[?:1.8.0_291]
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> ~[?:1.8.0_291]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> ~[?:1.8.0_291]
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> ~[?:1.8.0_291]
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> ~[?:1.8.0_291]
> at
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> ~[?:1.8.0_291]
> at
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145)
> ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106)
> ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142)
> ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116)
> ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
> ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> ~[flink-tidb-connector-1.13-0.0.4.jar:?]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
> ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
> 2021-11-19 07:55:36,996 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Clearing resource requirements of job 25e6800fc67392651c32db54b2fcc483
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)