[ 
https://issues.apache.org/jira/browse/FLINK-36223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjf updated FLINK-36223:
------------------------
    Description: 
1.SQL Server dynamic table error occurred,The triggering condition is that 
after I save the checkpoint, I add a new table to my Flink CDC, and then the 
exception occurs when using the checkpoint to restore the CDC task
2.The error log information is as follows

024-09-04 18:08:56,577 INFO tracer[] [debezium-reader-0] 
i.d.c.s.SqlServerStreamingChangeEventSource:? - CDC is enabled for table 
Capture instance "T_BD_SUPPLIER_L" 
[sourceTableId=AIS20231222100348.dbo.T_BD_SUPPLIER_L, 
changeTableId=AIS20231222100348.cdc.T_BD_SUPPLIER_L_CT, 
startLsn=000abdbd:0000192b:0001, changeTableObjectId=627568271, stopLsn=NULL] 
but the table is not whitelisted by connector
2024-09-04 18:08:56,947 ERROR tracer[] [Source Data Fetcher for Source: 
kingdee-cdc-supply_test-source (1/1)#0|#0] 
o.a.f.c.b.s.r.f.SplitFetcherManager:? - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
    at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
    at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
    at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69)
    at 
com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
    ... 6 common frames omitted
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException:
 file is not a valid field name
    at 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
    at 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
    at 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223)
    at 
io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getChangeTablesToQuery(SqlServerStreamingChangeEventSource.java:581)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:237)
    ... 10 common frames omitted
3.The Maven configuration file I introduced is as follows

  <properties>
        <flink.version>1.19.1</flink.version>
        <sql-connector.version>3.0.1</sql-connector.version>
    </properties>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
<!--            <scope>compile</scope>-->
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-cdc-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${sql-connector.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
            <version>${sql-connector.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-pulsar</artifactId>
            <version>4.1.0-1.18</version>
        </dependency>

The reason is that the Flink CDC reads the serialized checkpoint file, which 
only contains one table A in the file. However, when I add table B and the data 
in table B changes, this exception occurs. The reason is that this table is not 
included in the historical checkpoint serialization. Below is the image 
evidence of my debugging

After I added a new table, I couldn't find it here

!image-2024-09-05-14-17-56-066.png!
this.schema.tableFor(currentTable.getSourceTableId()) == null
I guess the judgment here is that when the checkpoint serialization content 
cannot be found, the newly added table needs to be initialized and added to the 
cache
!https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364333834-ab3141ee-e301-4387-a7ac-d1ee807cbe7c.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061127Z&X-Amz-Expires=300&X-Amz-Signature=048530026540a9ad812d0c3b55c8d465aed9fa5213ff3f7a1a77e223536307e3&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!

!image-2024-09-05-14-23-48-144.png!

But I don't understand this paragraph anymore. Struct's schema local variable 
doesn't contain the file attribute, so why bother reading it? Could it be that 
there is a problem with the package version I introduced? This issue has also 
occurred in Flink sql connector mysql cdc before, but it can be solved by 
setting. scanNewlyAddedTableEnabled (true) in MySqlSourceBuilder. However, I 
did not find it in the methods of the sqlServerSourceBuilder in Flink sql 
connector sqlserver cdc

!image-2024-09-05-14-37-46-581.png!

This code should be reading the file attribute

!image-2024-09-05-14-38-30-542.png!

Because I couldn't find it and threw an exception here, the specific 
manifestation is that I kept looping errors

!image-2024-09-05-14-38-49-424.png!

!image-2024-09-05-14-39-07-070.png!

If this problem is not resolved, I will not be able to read the last checkpoint 
or savepoint and will have to delete it. If there are any data modifications in 
the database at this time, I will not be able to capture them

  was:
1.SQL Server dynamic table error occurred,The triggering condition is that 
after I save the checkpoint, I add a new table to my Flink CDC, and then the 
exception occurs when using the checkpoint to restore the CDC task
2.The error log information is as follows

024-09-04 18:08:56,577 INFO tracer[] [debezium-reader-0] 
i.d.c.s.SqlServerStreamingChangeEventSource:? - CDC is enabled for table 
Capture instance "T_BD_SUPPLIER_L" 
[sourceTableId=AIS20231222100348.dbo.T_BD_SUPPLIER_L, 
changeTableId=AIS20231222100348.cdc.T_BD_SUPPLIER_L_CT, 
startLsn=000abdbd:0000192b:0001, changeTableObjectId=627568271, stopLsn=NULL] 
but the table is not whitelisted by connector
2024-09-04 18:08:56,947 ERROR tracer[] [Source Data Fetcher for Source: 
kingdee-cdc-supply_test-source (1/1)#0] o.a.f.c.b.s.r.f.SplitFetcherManager:? - 
Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
    at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
    at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
    at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69)
    at 
com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
    ... 6 common frames omitted
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException:
 file is not a valid field name
    at 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
    at 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
    at 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223)
    at 
io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147)
    at 
com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getChangeTablesToQuery(SqlServerStreamingChangeEventSource.java:581)
    at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:237)
    ... 10 common frames omitted
3.The Maven configuration file I introduced is as follows

  <properties>
        <flink.version>1.19.1</flink.version>
        <sql-connector.version>3.0.1</sql-connector.version>
    </properties>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
<!--            <scope>compile</scope>-->
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-cdc-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${sql-connector.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
            <version>${sql-connector.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-pulsar</artifactId>
            <version>4.1.0-1.18</version>
        </dependency>

The reason is that the Flink CDC reads the serialized checkpoint file, which 
only contains one table A in the file. However, when I add table B and the data 
in table B changes, this exception occurs. The reason is that this table is not 
included in the historical checkpoint serialization. Below is the image 
evidence of my debugging

After I added a new table, I couldn't find it here

!image-2024-09-05-14-17-56-066.png!
this.schema.tableFor(currentTable.getSourceTableId()) == null
I guess the judgment here is that when the checkpoint serialization content 
cannot be found, the newly added table needs to be initialized and added to the 
cache
!https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364333834-ab3141ee-e301-4387-a7ac-d1ee807cbe7c.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061127Z&X-Amz-Expires=300&X-Amz-Signature=048530026540a9ad812d0c3b55c8d465aed9fa5213ff3f7a1a77e223536307e3&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!

!image-2024-09-05-14-23-48-144.png!

But I don't understand this paragraph anymore. Struct's schema local variable 
doesn't contain the file attribute, so why bother reading it? Could it be that 
there is a problem with the package version I introduced? This issue has also 
occurred in Flink sql connector mysql cdc before, but it can be solved by 
setting. scanNewlyAddedTableEnabled (true) in MySqlSourceBuilder. However, I 
did not find it in the methods of the sqlServerSourceBuilder in Flink sql 
connector sqlserver cdc

!https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364334047-665c4a9e-95a5-482b-b44d-feeff113f49a.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061454Z&X-Amz-Expires=300&X-Amz-Signature=56d6dcd9f3cbfb20e58aa9384c14fe90434b4d2f52ade1cd06cba55c727bc0ac&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!

This code should be reading the file attribute

!https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364334160-0d57dc2f-ce72-4e10-9f79-0cfb30ec5e9c.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061517Z&X-Amz-Expires=300&X-Amz-Signature=e9234713fd0aba956f1b19cf105b58312f7809179ce8653b9ef024c8eda841e0&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!

Because I couldn't find it and threw an exception here, the specific 
manifestation is that I kept looping errors

!https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364334318-80f3c904-2022-4d62-be05-348ea3893af1.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061537Z&X-Amz-Expires=300&X-Amz-Signature=4d8b08dafb5e98e526350eeb7292d4a5ee322625beeac87e5c86963a29ab8993&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!

!https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364334463-d5a25b43-aa7c-4a45-b677-554acf4b4b3b.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061555Z&X-Amz-Expires=300&X-Amz-Signature=b8e4dd2552aae7a1fba79dfec5cd771d29d7d5d761f7d68f6cca9117f8a8b042&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!

 

If this problem is not resolved, I will not be able to read the last checkpoint 
or savepoint and will have to delete it. If there are any data modifications in 
the database at this time, I will not be able to capture them


> SplitFetcher thread 0 received unexpected exception while polling the records
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-36223
>                 URL: https://issues.apache.org/jira/browse/FLINK-36223
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1
>         Environment: # JDK 1.8
>  # SQL SERVER 2019
>  # FLINK CDC 3X
>            Reporter: zjf
>            Priority: Major
>         Attachments: image-2024-09-05-14-17-56-066.png, 
> image-2024-09-05-14-23-48-144.png, image-2024-09-05-14-35-58-759.png, 
> image-2024-09-05-14-36-12-672.png, image-2024-09-05-14-37-46-581.png, 
> image-2024-09-05-14-38-30-542.png, image-2024-09-05-14-38-49-424.png, 
> image-2024-09-05-14-39-07-070.png
>
>
> 1.SQL Server dynamic table error occurred,The triggering condition is that 
> after I save the checkpoint, I add a new table to my Flink CDC, and then the 
> exception occurs when using the checkpoint to restore the CDC task
> 2.The error log information is as follows
> 024-09-04 18:08:56,577 INFO tracer[] [debezium-reader-0] 
> i.d.c.s.SqlServerStreamingChangeEventSource:? - CDC is enabled for table 
> Capture instance "T_BD_SUPPLIER_L" 
> [sourceTableId=AIS20231222100348.dbo.T_BD_SUPPLIER_L, 
> changeTableId=AIS20231222100348.cdc.T_BD_SUPPLIER_L_CT, 
> startLsn=000abdbd:0000192b:0001, changeTableObjectId=627568271, stopLsn=NULL] 
> but the table is not whitelisted by connector
> 2024-09-04 18:08:56,947 ERROR tracer[] [Source Data Fetcher for Source: 
> kingdee-cdc-supply_test-source (1/1)#0|#0] 
> o.a.f.c.b.s.r.f.SplitFetcherManager:? - Received uncaught exception.
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected 
> exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: 
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
>  An exception occurred in the change event producer. This connector will be 
> stopped.
>     at 
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
>     at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459)
>     at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
>     at 
> com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
>     at 
> com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69)
>     at 
> com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
>     ... 6 common frames omitted
> Caused by: 
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException:
>  file is not a valid field name
>     at 
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
>     at 
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
>     at 
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
>     at 
> com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193)
>     at 
> com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223)
>     at 
> io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47)
>     at 
> com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147)
>     at 
> com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62)
>     at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getChangeTablesToQuery(SqlServerStreamingChangeEventSource.java:581)
>     at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:237)
>     ... 10 common frames omitted
> 3.The Maven configuration file I introduced is as follows
>   <properties>
>         <flink.version>1.19.1</flink.version>
>         <sql-connector.version>3.0.1</sql-connector.version>
>     </properties>
> <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-base</artifactId>
>             <version>${flink.version}</version>
> <!--            <scope>compile</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>com.ververica</groupId>
>             <artifactId>flink-cdc-base</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>com.ververica</groupId>
>             <artifactId>flink-sql-connector-mysql-cdc</artifactId>
>             <version>${sql-connector.version}</version>
>             <scope>compile</scope>
>         </dependency>
>         <dependency>
>             <groupId>com.ververica</groupId>
>             <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
>             <version>${sql-connector.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-streaming-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-clients</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-runtime-web</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-runtime</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka</artifactId>
>             <version>3.1.0-1.18</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-pulsar</artifactId>
>             <version>4.1.0-1.18</version>
>         </dependency>
> The reason is that the Flink CDC reads the serialized checkpoint file, which 
> only contains one table A in the file. However, when I add table B and the 
> data in table B changes, this exception occurs. The reason is that this table 
> is not included in the historical checkpoint serialization. Below is the 
> image evidence of my debugging
> After I added a new table, I couldn't find it here
> !image-2024-09-05-14-17-56-066.png!
> this.schema.tableFor(currentTable.getSourceTableId()) == null
> I guess the judgment here is that when the checkpoint serialization content 
> cannot be found, the newly added table needs to be initialized and added to 
> the cache
> !https://github-production-user-asset-6210df.s3.amazonaws.com/102848709/364333834-ab3141ee-e301-4387-a7ac-d1ee807cbe7c.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240905%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240905T061127Z&X-Amz-Expires=300&X-Amz-Signature=048530026540a9ad812d0c3b55c8d465aed9fa5213ff3f7a1a77e223536307e3&X-Amz-SignedHeaders=host&actor_id=102848709&key_id=0&repo_id=282994686!
> !image-2024-09-05-14-23-48-144.png!
> But I don't understand this paragraph anymore. Struct's schema local variable 
> doesn't contain the file attribute, so why bother reading it? Could it be 
> that there is a problem with the package version I introduced? This issue has 
> also occurred in Flink sql connector mysql cdc before, but it can be solved 
> by setting. scanNewlyAddedTableEnabled (true) in MySqlSourceBuilder. However, 
> I did not find it in the methods of the sqlServerSourceBuilder in Flink sql 
> connector sqlserver cdc
> !image-2024-09-05-14-37-46-581.png!
> This code should be reading the file attribute
> !image-2024-09-05-14-38-30-542.png!
> Because I couldn't find it and threw an exception here, the specific 
> manifestation is that I kept looping errors
> !image-2024-09-05-14-38-49-424.png!
> !image-2024-09-05-14-39-07-070.png!
> If this problem is not resolved, I will not be able to read the last 
> checkpoint or savepoint and will have to delete it. If there are any data 
> modifications in the database at this time, I will not be able to capture them



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to