alextinng opened a new pull request, #7894:
URL: https://github.com/apache/seatunnel/pull/7894
<!--
Thank you for contributing to SeaTunnel! Please make sure that your code
changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
## Contribution Checklist
- Make sure that the pull request corresponds to a [GITHUB
issue](https://github.com/apache/seatunnel/issues).
- Name the pull request in the form "[Feature] [component] Title of the
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
- Minor fixes should be named following this pattern: `[hotfix] [docs] Fix
typo in README.md doc`.
-->
### Purpose of this pull request
close https://github.com/apache/seatunnel/issues/7886
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
case 1: column count(reader) < column count(writer)
result: operation failed. according to config, seatunnel try write 2 columns
into table but only 1 column provided(except table_name and tags)
st config:
```
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
# This is a example input plugin **only for test and demonstrate the
feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
tenant_id = int
}
}
result_table_name = "fake"
}
}
sink {
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
test log:
```
Exception in thread "main"
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel
job executed failed
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by:
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException:
java.lang.RuntimeException: java.sql.SQLException: TDengine ERROR (216): sql:
INSERT INTO 534905432 using meter tags ( 1.9691318559072168E307 )
('ts','latitude') VALUES ( '2024-09-24 02:27:45.000' );, desc: syntax error
near '1.9691318559072168e307 ) ('ts','latitude') values ( '2024-09-24' (invalid
int data)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
at
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO
534905432 using meter tags ( 1.9691318559072168E307 ) ('ts','latitude') VALUES
( '2024-09-24 02:27:45.000' );, desc: syntax error near '1.9691318559072168e307
) ('ts','latitude') values ( '2024-09-24' (invalid int data)
at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
at
com.taosdata.jdbc.rs.RestfulStatement.execute(RestfulStatement.java:77)
at
com.taosdata.jdbc.rs.RestfulStatement.executeUpdate(RestfulStatement.java:46)
at
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:109)
at
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:51)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
... 17 more
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
```
case 2: column count(reader) == column count(writer)
result: succeed write data into database
st config:
```
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
# This is a example input plugin **only for test and demonstrate the
feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
lat = double
tenant_id = int
}
}
result_table_name = "fake"
}
}
sink {
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
test log:
```
2024-10-23 20:07:54,149 DEBUG [o.a.s.c.s.f.s.FakeSourceReader]
[hz.main.generic-operation.thread-20] - reader 0 add splits
[FakeSourceSplit(tableId=fake, splitId=0, rowNum=1)]
2024-10-23 20:07:54,464 DEBUG [c.h.s.i.o.i.InvocationMonitor ]
[hz.main.InvocationMonitorThread] - [localhost]:5801 [seatunnel-868391] [5.1]
Invocations:1 timeouts:0 backup-timeouts:0
2024-10-23 20:07:55,098 INFO [o.a.s.c.s.f.s.FakeSourceReader]
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1,
taskGroupId=30000}] - 1 rows of data have been generated in split(fake_0) for
table fake. Generation time: 1729685275089
2024-10-23 20:07:55,098 INFO [o.a.s.c.s.f.s.FakeSourceReader]
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1,
taskGroupId=30000}] - Closed the bounded fake source
2024-10-23 20:07:55,107 DEBUG [a.s.c.s.t.s.TDengineSinkWriter]
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1,
taskGroupId=30000}] - sql content: INSERT INTO 1173840892 using meter tags (
1730702775 ) ('ts','latitude') VALUES ( '2024-08-04
16:11:32.000',6.408192514684265E307 );
2024-10-23 20:07:55,109 DEBUG [o.a.h.c.p.RequestAddCookies ]
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1,
taskGroupId=30000}] - CookieSpec selected: default
2024-10-23 20:07:55,109 DEBUG [o.a.h.c.p.RequestAuthCache ]
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1,
taskGroupId=30000}] - Auth cache not set in the context
2024-10-23 20:07:55,109 DEBUG [ingHttpClientConnectionManager]
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1,
taskGroupId=30000}] - Connection request: [route:
{}->http://10.5.2.26:6041][total available: 1; route allocated: 1 of 20; total
allocated: 1 of 200]
```
case 3: column count(reader) > column count(writer)
result: operation failed. according to config, seatunnel shuld write 2
columns into table but 3 column provided(except table_name and tags), currently
seatunnel does not support find column value by column name, so a wrong sql is
generated.
st config:
```
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
# This is a example input plugin **only for test and demonstrate the
feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
test_field = string
lat = double
tenant_id = int
}
}
result_table_name = "fake"
}
}
sink {
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
test log:
```
Exception in thread "main"
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel
job executed failed
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by:
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException:
java.lang.RuntimeException: java.sql.SQLException: TDengine ERROR (216): sql:
INSERT INTO 1574017200 using meter tags ( 18329617 ) ('ts','latitude') VALUES (
'2024-07-13 19:04:23.000','douag',1.6443365459008203E308 );, desc: syntax error
near 'douag' (illegal double data)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
at
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
at
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO
1574017200 using meter tags ( 18329617 ) ('ts','latitude') VALUES ( '2024-07-13
19:04:23.000','douag',1.6443365459008203E308 );, desc: syntax error near
'douag' (illegal double data)
at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
at
com.taosdata.jdbc.rs.RestfulStatement.execute(RestfulStatement.java:77)
at
com.taosdata.jdbc.rs.RestfulStatement.executeUpdate(RestfulStatement.java:46)
at
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:109)
at
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:51)
at
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
... 17 more
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
... 2 more
```
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
2. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
3. Add ci label in
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
4. Add e2e testcase in
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
5. Update connector
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
* [ ] Update the
[`release-note`](https://github.com/apache/seatunnel/blob/dev/release-note.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]