alextinng opened a new pull request, #7894:
URL: https://github.com/apache/seatunnel/pull/7894

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code 
changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
     - Make sure that the pull request corresponds to a [GITHUB 
issue](https://github.com/apache/seatunnel/issues).
     - Name the pull request in the form "[Feature] [component] Title of the 
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix 
typo in README.md doc`.
   -->
   
   ### Purpose of this pull request
   
   close https://github.com/apache/seatunnel/issues/7886
   
   ### Does this PR introduce _any_ user-facing change?
   
   no 
   
   ### How was this patch tested?
   
   case 1: column count(reader) < column count(writer)
   
   result: operation failed. according to config, seatunnel try write 2 columns 
into table but only 1 column provided(except table_name and tags)
   
   st config:
   
   ```
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     # This is a example input plugin **only for test and demonstrate the 
feature input plugin**
     FakeSource {
       row.num = 1
       schema = {
         fields {
           table_name = int
           ts = timestamp
           tenant_id = int
         }
       }
       result_table_name = "fake"
     }
   }
   
   sink {
           TDengine {
   source_table_name='fake'
             url : "jdbc:TAOS-RS://localhost:6041/"
             username : "root"
             password : "taosdata"
             database : "test"
   fields: ['ts','latitude']
             stable : "meter"
             timezone: UTC
           }
   }
   ```
   
   test log: 
   
   ```
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
           at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
           at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
java.lang.RuntimeException: java.sql.SQLException: TDengine ERROR (216): sql: 
INSERT INTO 534905432 using meter tags ( 1.9691318559072168E307 ) 
('ts','latitude') VALUES ( '2024-09-24 02:27:45.000' );, desc: syntax error 
near '1.9691318559072168e307 ) ('ts','latitude') values ( '2024-09-24' (invalid 
int data)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
           at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO 
534905432 using meter tags ( 1.9691318559072168E307 ) ('ts','latitude') VALUES 
( '2024-09-24 02:27:45.000' );, desc: syntax error near '1.9691318559072168e307 
) ('ts','latitude') values ( '2024-09-24' (invalid int data)
           at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
           at 
com.taosdata.jdbc.rs.RestfulStatement.execute(RestfulStatement.java:77)
           at 
com.taosdata.jdbc.rs.RestfulStatement.executeUpdate(RestfulStatement.java:46)
           at 
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:109)
           at 
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:51)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
           ... 17 more
   
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
   ```
   
   case 2: column count(reader) == column count(writer)
   
   result: succeed write data into database
   
   st config:
   
   ```
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     # This is a example input plugin **only for test and demonstrate the 
feature input plugin**
     FakeSource {
       row.num = 1
       schema = {
         fields {
           table_name = int
           ts = timestamp
           lat = double
           tenant_id = int
         }
       }
       result_table_name = "fake"
     }
   }
   
   sink {
           TDengine {
   source_table_name='fake'
             url : "jdbc:TAOS-RS://localhost:6041/"
             username : "root"
             password : "taosdata"
             database : "test"
   fields: ['ts','latitude']
             stable : "meter"
             timezone: UTC
           }
   }
   ```
   
   test log:
   
   ```
   2024-10-23 20:07:54,149 DEBUG [o.a.s.c.s.f.s.FakeSourceReader] 
[hz.main.generic-operation.thread-20] - reader 0 add splits 
[FakeSourceSplit(tableId=fake, splitId=0, rowNum=1)]
   2024-10-23 20:07:54,464 DEBUG [c.h.s.i.o.i.InvocationMonitor ] 
[hz.main.InvocationMonitorThread] - [localhost]:5801 [seatunnel-868391] [5.1] 
Invocations:1 timeouts:0 backup-timeouts:0
   2024-10-23 20:07:55,098 INFO  [o.a.s.c.s.f.s.FakeSourceReader] 
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, 
taskGroupId=30000}] - 1 rows of data have been generated in split(fake_0) for 
table fake. Generation time: 1729685275089
   2024-10-23 20:07:55,098 INFO  [o.a.s.c.s.f.s.FakeSourceReader] 
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, 
taskGroupId=30000}] - Closed the bounded fake source
   2024-10-23 20:07:55,107 DEBUG [a.s.c.s.t.s.TDengineSinkWriter] 
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, 
taskGroupId=30000}] - sql content: INSERT INTO 1173840892 using meter tags ( 
1730702775 ) ('ts','latitude') VALUES ( '2024-08-04 
16:11:32.000',6.408192514684265E307 );
   2024-10-23 20:07:55,109 DEBUG [o.a.h.c.p.RequestAddCookies   ] 
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, 
taskGroupId=30000}] - CookieSpec selected: default
   2024-10-23 20:07:55,109 DEBUG [o.a.h.c.p.RequestAuthCache    ] 
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, 
taskGroupId=30000}] - Auth cache not set in the context
   2024-10-23 20:07:55,109 DEBUG [ingHttpClientConnectionManager] 
[BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, 
taskGroupId=30000}] - Connection request: [route: 
{}->http://10.5.2.26:6041][total available: 1; route allocated: 1 of 20; total 
allocated: 1 of 200]
   ```
   
   case 3: column count(reader) > column count(writer)
   
   result: operation failed. according to config, seatunnel shuld write 2 
columns into table but 3 column provided(except table_name and tags), currently 
seatunnel does not support find column value by column name, so a wrong sql is 
generated.
   
   st config:
   
   ```
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     # This is a example input plugin **only for test and demonstrate the 
feature input plugin**
     FakeSource {
       row.num = 1
       schema = {
         fields {
           table_name = int
           ts = timestamp
           test_field = string
           lat = double
           tenant_id = int
         }
       }
       result_table_name = "fake"
     }
   }
   
   sink {
           TDengine {
   source_table_name='fake'
             url : "jdbc:TAOS-RS://localhost:6041/"
             username : "root"
             password : "taosdata"
             database : "test"
   fields: ['ts','latitude']
             stable : "meter"
             timezone: UTC
           }
   }
   ```
   
   test log:
   
   ```
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
           at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
           at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
java.lang.RuntimeException: java.sql.SQLException: TDengine ERROR (216): sql: 
INSERT INTO 1574017200 using meter tags ( 18329617 ) ('ts','latitude') VALUES ( 
'2024-07-13 19:04:23.000','douag',1.6443365459008203E308 );, desc: syntax error 
near 'douag' (illegal double data)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
           at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO 
1574017200 using meter tags ( 18329617 ) ('ts','latitude') VALUES ( '2024-07-13 
19:04:23.000','douag',1.6443365459008203E308 );, desc: syntax error near 
'douag' (illegal double data)
           at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
           at 
com.taosdata.jdbc.rs.RestfulStatement.execute(RestfulStatement.java:77)
           at 
com.taosdata.jdbc.rs.RestfulStatement.executeUpdate(RestfulStatement.java:46)
           at 
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:109)
           at 
org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:51)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
           ... 17 more
   
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
           ... 2 more
   ```
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
   * [ ] Update the 
[`release-note`](https://github.com/apache/seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to