Simon Tao created FLINK-22089:
---------------------------------
Summary: cdc checkpoint invalid
Key: FLINK-22089
URL: https://issues.apache.org/jira/browse/FLINK-22089
Project: Flink
Issue Type: Bug
Components: Connectors / Common, Runtime / Checkpointing
Affects Versions: 1.12.0
Environment: public static void main(String[] args) {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("xxx")
.port(3306)
.databaseList("xxx") // monitor all tables under inventory database
.username("xxx")
.password("xxx")
.deserializer(new StringDebeziumDeserializationSchema()) // converts
SourceRecord to String
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
try
{ env.setStateBackend(new RocksDBStateBackend("file:///E:/tmp/t2")); }
catch (IOException e)
{ e.printStackTrace(); }
env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1
for sink to keep message ordering
try \{ env.execute(); } catch (Exception e) \{ e.printStackTrace(); }
}
====================maven=========================
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-test-utils -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--
https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc
-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
<!--
https://mvnrepository.com/artifact/ru.ivi.opensource/flink-clickhouse-sink -->
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
Reporter: Simon Tao
Fix For: shaded-13.0
i turn on checkpoint but it seems invalid , during the whole process ,the
checkpoint file can write to localfile and the flink can read the cdc recrods
normally, but when i restart flink in idea ,it always reload consumed records .
i paste on my code and maven configuration below
--
This message was sent by Atlassian Jira
(v8.3.4#803005)