hksharma-ai commented on issue #4090: URL: https://github.com/apache/flink-cdc/issues/4090#issuecomment-3175805979
Hi Team, As per the documentation I am trying to create a FlinkCDC connector with DataSterm but I do not see anything coming and printing> Here is the Code and Pom file Java Code `import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import java.time.Duration; import java.util.Properties; public class PostgresParallelSourceExample { public static void main(String[] args) throws Exception { DebeziumDeserializationSchema<String> deserializer = new JsonDebeziumDeserializationSchema(); Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("snapshot.mode", "initial"); debeziumProperties.setProperty("snapshot.fetch.size", "100"); debeziumProperties.setProperty("snapshot.lock.timeout.ms", "10000"); debeziumProperties.setProperty("decimal.handling.mode", "string"); debeziumProperties.setProperty("include.schema.changes", "false"); debeziumProperties.setProperty("provide.transaction.metadata", "false"); debeziumProperties.setProperty("max.queue.size", "8192"); debeziumProperties.setProperty("max.batch.size", "1024"); debeziumProperties.setProperty("snapshot.mode", "always"); JdbcIncrementalSource<String> postgresIncrementalSource = PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() .hostname("localhost") .port(5432) .database("postgres") .schemaList("public") .tableList("public.flink_shipments") .username("postgres") .password("postgres") .slotName("flink_shipments") .decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+ .deserializer(deserializer) .splitSize(2) .debeziumProperties(debeziumProperties) .heartbeatInterval(Duration.ofSeconds(30)) // the split size of each snapshot split .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); env.fromSource( postgresIncrementalSource, WatermarkStrategy.noWatermarks(), "PostgresParallelSource") .setParallelism(1).map(value -> { System.out.println("CDC Event: " + value); return value; }).addSink(new PrintSinkFunction<>()).name("PrintSink"); env.execute("Output Postgres Snapshot"); }` POM File `<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>og-flink-cdc-incremental</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- Use a single, consistent Flink version --> <flink.version>1.20.0</flink.version> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.binary.version>2.12</scala.binary.version> <flink-cdc.version>3.4.0</flink-cdc.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink CDC dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>${flink-cdc.version}</version> </dependency> <!-- This dependency is required for the RecordEmitter class --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cdc-base</artifactId> <version>${flink-cdc.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:flink-shaded-guava</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.example.PostgresParallelSourceExample</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org