evan766 commented on PR #4918:
URL: https://github.com/apache/seatunnel/pull/4918#issuecomment-1631908573

   > @evan766 Can you share your json configuration?
   
   thank you very much to reply me!
   steps I do, where was wrong?
   1、fork your project: 
https://github.com/Carl-Zhou-CN/incubator-seatunnel/tree/flink-cdc
   2、here is my forked project: `https://github.com/evan766/incubator-seatunnel`
   3、clone https://github.com/evan766/incubator-seatunnel to my local machine.
   4 、
   ```
   git clone [email protected]:evan766/incubator-seatunnel.git
   checkout  flink-cdc
   checkout -b  dev-cdc
   git remote add apache-seatunnel https://github.com/apache/seatunnel.git
   git merge apache-seatunnel/dev
   git push origin
   ```
   5、follow the steps in the tutorial to build binanry package: 
https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/setup.md
   ```
   #Install Subproject Locally
   ./mvnw install -Dmaven.test.skip
   
   #Building seaTunnel from source
   mvn clean package -pl seatunnel-dist -am -Dmaven.test.skip=true
   ```
   
   6、terminal  output  target in dir:  incubator-seatunnel/seatunnel-dist/target
   ```
   $: ls -l incubator-seatunnel/seatunnel-dist/target
   
   -rw-r--r--@ 1 hong  staff  1593852848 Jul 12 11:56 
apache-seatunnel-2.3.2-SNAPSHOT-bin.tar.gz
   -rw-r--r--@ 1 hong  staff     4838534 Jul 12 11:56 
apache-seatunnel-2.3.2-SNAPSHOT-src.tar.gz
   drwxr-xr-x@ 2 hong  staff          64 Jul 12 11:55 archive-tmp
   drwxr-xr-x@ 3 hong  staff          96 Jul 12 11:55 
maven-shared-archive-resources
   -rw-r--r--@ 1 hong  staff          81 Jul 12 11:55 spotless-index
   
   ```
   7、 push job to flink , flink Version: 1.16.2 
   ```
   tar zxvf apache-seatunnel-2.3.2-SNAPSHOT-bin.tar.gz
   bash bin/start-seatunnel-flink-15-connector-v2.sh --config 
config/cdc.template
   ```
   cdc.templte
   ```
   env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 10000
   }
   
   source {
     MySQL-CDC {
     debezium {
             poll.interval.ms = 1000
         }
       result_table_name = "temp_organization"
       parallelism = 1
       server-id = 5656
       username = "root"
       password = "root"
       table-names = ["testbeat.organization"]
       base-url = "jdbc:mysql://192.168.66.227:3306/testbeat"
     }
   }
   
   sink {
       jdbc {
             source_table_name:"temp_organization"
             url = "jdbc:mysql://192.168.66.227:3306/seatunnel"
             driver = "com.mysql.cj.jdbc.Driver"
             user = "root"
             password = "root"
             generate_sink_sql = "true"
             database = "seatunnel"
             table = "organization2"
             primary_keys = ["id"]
       }
   }
   ```
   8、flink job failed:
   
   ```
   2023-07-12 14:10:02
   java.lang.UnsupportedOperationException: Flink ParallelSource don't support 
sending SourceEvent. Please implement the `SupportCoordinate` marker interface 
on the SeaTunnel source.
        at 
org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:61)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:151)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:133)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:204)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:180)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:161)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
        at 
org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:136)
        at 
org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction.run(BaseSeaTunnelSourceFunction.java:86)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
   
   ```
   
![image](https://github.com/apache/seatunnel/assets/30330163/cf825655-5afd-404c-82e7-f789953a15d9)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to