Sergei Morozov created FLINK-37852:
--------------------------------------

             Summary: SQL Server CDC cannot capture changes from tables with 
special characters
                 Key: FLINK-37852
                 URL: https://issues.apache.org/jira/browse/FLINK-37852
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
            Reporter: Sergei Morozov


Configure a connector against a table containing an opening square bracket in 
its name (e.g. {{customers [1]}}).
h5. Expected behavior

The connector captures the changes as usual.
h5. Actual behavior

The connector fails to introspect the table schema and fails as follows:

{noformat}org.apache.flink.runtime.source.coordinator.SourceCoordinator - 
Uncaught exception in the SplitEnumerator for Source Source: customers[1] while 
handling operator event RequestSplitEvent (host='localhost') from subtask 0 
(#0). Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Chunk splitting has encountered 
exception
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.checkSplitterErrors(SnapshotSplitAssigner.java:687)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:402)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:182)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:219)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:117)
 ~[classes/:?]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:637)
 ~[flink-runtime-1.20.1.jar:1.20.1]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:306)
 ~[flink-runtime-1.20.1.jar:1.20.1]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:533)
 ~[flink-runtime-1.20.1.jar:1.20.1]
        at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 [flink-core-1.20.1.jar:1.20.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_452]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_452]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_452]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_452]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_452]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_452]
Caused by: java.lang.IllegalStateException: Error when splitting chunks for 
customer.dbo.customers [1]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:361)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_452]
        ... 3 more
Caused by: java.lang.RuntimeException: Fail to analyze table in chunk splitter.
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.analyzeTable(JdbcSourceChunkSplitter.java:368)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.generateSplits(JdbcSourceChunkSplitter.java:112)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:359)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_452]
        ... 3 more
Caused by: org.apache.flink.table.api.ValidationException: To use incremental 
snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the 
table doesn't have primary keys.
        at 
org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils.getSplitColumn(JdbcChunkUtils.java:113)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.getSplitColumn(JdbcSourceChunkSplitter.java:260)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.analyzeTable(JdbcSourceChunkSplitter.java:363)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.generateSplits(JdbcSourceChunkSplitter.java:112)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:359)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_452]
        ... 3 more
25337 [SourceCoordinator-Source: customers[1]] ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Uncaught 
exception in the SplitEnumerator for Source Source: customers[1] while handling 
operator event RequestSplitEvent (host='localhost') from subtask 0 (#1). 
Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Chunk splitting has encountered 
exception
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.checkSplitterErrors(SnapshotSplitAssigner.java:687)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:402)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:182)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:219)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:117)
 ~[classes/:?]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:637)
 ~[flink-runtime-1.20.1.jar:1.20.1]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:306)
 ~[flink-runtime-1.20.1.jar:1.20.1]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:533)
 ~[flink-runtime-1.20.1.jar:1.20.1]
        at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 [flink-core-1.20.1.jar:1.20.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_452]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_452]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_452]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_452]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_452]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_452]
Caused by: java.lang.IllegalStateException: Error when splitting chunks for 
customer.dbo.customers [1]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:361)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_452]
        ... 3 more
Caused by: java.lang.RuntimeException: Fail to analyze table in chunk splitter.
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.analyzeTable(JdbcSourceChunkSplitter.java:368)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.generateSplits(JdbcSourceChunkSplitter.java:112)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:359)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_452]
        ... 3 more
Caused by: org.apache.flink.table.api.ValidationException: To use incremental 
snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the 
table doesn't have primary keys.
        at 
org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils.getSplitColumn(JdbcChunkUtils.java:113)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.getSplitColumn(JdbcSourceChunkSplitter.java:260)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.analyzeTable(JdbcSourceChunkSplitter.java:363)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.generateSplits(JdbcSourceChunkSplitter.java:112)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:359)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_452]
        ... 3 more

java.lang.RuntimeException: Failed to fetch next result

        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:129)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:100)
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:247)
        at 
org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceITCase.fetchRows(SqlServerSourceITCase.java:565)
        at 
org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceITCase.testSqlServerParallelSource(SqlServerSourceITCase.java:492)
        at 
org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceITCase.testSqlServerParallelSource(SqlServerSourceITCase.java:383)
        at 
org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceITCase.testReadSingleTableWithSingleParallelism(SqlServerSourceITCase.java:73)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
Caused by: java.io.IOException: Failed to fetch job execution result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:187)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:123)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:126)
        ... 9 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
        ... 11 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
        at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
        ... 11 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, 
backoffTimeMS=0)
        at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
        at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
        at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:140)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:324)
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:669)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225)
        at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
        at 
java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: customers[1] -> ConstraintEnforcer[2] -> Sink: 
Collect table sink' (operator cbc357ccb763df2852fee8c4fc7d55f2).
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:259)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:432)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:547)
        at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkRuntimeException: Chunk splitting has 
encountered exception
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.checkSplitterErrors(SnapshotSplitAssigner.java:687)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:402)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:182)
        at 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:219)
        at 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:117)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:637)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:306)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:533)
        ... 9 more
Caused by: java.lang.IllegalStateException: Error when splitting chunks for 
customer.dbo.customers [1]
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:361)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitChunksForRemainingTables(SnapshotSplitAssigner.java:670)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        ... 3 more
Caused by: java.lang.RuntimeException: Fail to analyze table in chunk splitter.
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.analyzeTable(JdbcSourceChunkSplitter.java:368)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.generateSplits(JdbcSourceChunkSplitter.java:112)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.splitTable(SnapshotSplitAssigner.java:359)
        ... 7 more
Caused by: org.apache.flink.table.api.ValidationException: To use incremental 
snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the 
table doesn't have primary keys.
        at 
org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils.getSplitColumn(JdbcChunkUtils.java:113)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.getSplitColumn(JdbcSourceChunkSplitter.java:260)
        at 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.analyzeTable(JdbcSourceChunkSplitter.java:363)
        ... 9 more
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to