hawk9821 opened a new pull request, #9452:
URL: https://github.com/apache/seatunnel/pull/9452
<!--
Thank you for contributing to SeaTunnel! Please make sure that your code
changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
## Contribution Checklist
- Make sure that the pull request corresponds to a [GITHUB
issue](https://github.com/apache/seatunnel/issues).
- Name the pull request in the form "[Feature] [component] Title of the
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
- Minor fixes should be named following this pattern: `[hotfix] [docs] Fix
typo in README.md doc`.
-->
### Purpose of this pull request
source : mysql
```
CREATE TABLE `product` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` decimal(4,1) DEFAULT NULL COMMENT '单价',
PRIMARY KEY (`id`)
) COMMENT='商品表' ;
insert into product values (1,'product1', 101.1);
insert into product values (2,'product2', 999.9);
```
sink : paimon flink
```
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = '/tmp/paimon'
);
use catalog `paimon`;
create database if not exists `test`;
CREATE TABLE `test`.`product` (
`id` int NOT NULL COMMENT '主键',
`name` varchar(100) COMMENT '商品名称',
`price` decimal(4,2) COMMENT '单价',
CONSTRAINT `PK_id` PRIMARY KEY (`id`) NOT ENFORCED
)
```
conf
```
env {
parallelism = 1
job.mode = "BATCH"
}
source {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/source?connectTimeout=5000"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "*****"
password = "******"
table_path = "source.product"
split.size = 8096
plugin_output = "table_info"
}
}
sink {
Paimon {
plugin_input = "table_info"
schema_save_mode = "RECREATE_SCHEMA"
catalog_name = "seatunnel_test"
warehouse = "/tmp/paimon"
database = "test"
table = "product"
generate_sink_sql = true
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
```
Exception before the fix : the exception information is unclear, which is
not convenient to locate the problem
```
Exception in thread "main"
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel
job executed failed
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by:
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: table source.product sink throw error
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table
source.product sink throw error
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
... 18 more
Caused by: java.lang.NullPointerException
at
org.apache.paimon.data.AbstractBinaryWriter.writeDecimal(AbstractBinaryWriter.java:128)
at
org.apache.paimon.data.BinaryRowWriter.writeDecimal(BinaryRowWriter.java:25)
at
org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:412)
at
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
at
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
... 6 more
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
```
Exception after the fix :
```
Exception in thread "main"
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel
job executed failed
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by:
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: table source.product sink throw error
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table
source.product sink throw error
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
... 18 more
Caused by:
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
ErrorCode:[PAIMON-11], ErrorDescription:[deciaml type precision is
incompatible. ] - `price` field value is: 101.1, except filed schema of sink is
`price` DECIMAL(4, 1), but the filed in sink table which actual schema is
`price` DECIMAL(4, 2) '单价'.Please check schema of sink table.
at
org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.checkCanWriteWithSchema(RowConverter.java:542)
at
org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:392)
at
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
at
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
at
org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
... 6 more
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
```
<!-- Describe the purpose of this pull request. For example: This pull
request adds checkstyle plugin.-->
### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as
the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes
- provide the console output, description and/or an example to show the
behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to
the released SeaTunnel versions or within the unreleased branches such as dev.
If no, write 'No'.
If you are adding/modifying connector documents, please follow our new
specifications: https://github.com/apache/seatunnel/issues/4544.
-->
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some
test cases that check the changes thoroughly including negative and positive
cases if possible.
If it was tested in a way different from regular unit tests, please clarify
how you tested step by step, ideally copy and paste-able, so that other
reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why
it was difficult to add.
If you are adding E2E test cases, maybe refer to
https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf,
here is a good example.
-->
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
2. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
3. Add ci label in
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
4. Add e2e testcase in
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
5. Update connector
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]