hksharma-ai commented on issue #4090:
URL: https://github.com/apache/flink-cdc/issues/4090#issuecomment-3175805979

   Hi Team,
   As per the documentation I am trying to create a FlinkCDC connector with 
DataSterm but I do not see anything coming and printing>
   Here is the Code and Pom file
   
   Java Code
   
   `import 
org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
   import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
   import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
   import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
   import org.apache.flink.api.common.eventtime.WatermarkStrategy;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
   
   import java.time.Duration;
   import java.util.Properties;
   
   public class PostgresParallelSourceExample {
   
       public static void main(String[] args) throws Exception {
   
           DebeziumDeserializationSchema<String> deserializer =
                   new JsonDebeziumDeserializationSchema();
   
           Properties debeziumProperties = new Properties();
           debeziumProperties.setProperty("snapshot.mode", "initial");
           debeziumProperties.setProperty("snapshot.fetch.size", "100");
           debeziumProperties.setProperty("snapshot.lock.timeout.ms", "10000");
           debeziumProperties.setProperty("decimal.handling.mode", "string");
           debeziumProperties.setProperty("include.schema.changes", "false");
           debeziumProperties.setProperty("provide.transaction.metadata", 
"false");
           debeziumProperties.setProperty("max.queue.size", "8192");
           debeziumProperties.setProperty("max.batch.size", "1024");
           debeziumProperties.setProperty("snapshot.mode", "always");
   
   
           JdbcIncrementalSource<String> postgresIncrementalSource =
                   
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                           .hostname("localhost")
                           .port(5432)
                           .database("postgres")
                           .schemaList("public")
                           .tableList("public.flink_shipments")
                           .username("postgres")
                           .password("postgres")
                           .slotName("flink_shipments")
                           .decodingPluginName("pgoutput") // use pgoutput for 
PostgreSQL 10+
                           .deserializer(deserializer)
                           .splitSize(2)
                           .debeziumProperties(debeziumProperties)
                           .heartbeatInterval(Duration.ofSeconds(30))
                           // the split size of each snapshot split
                           .build();
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
           env.enableCheckpointing(3000);
   
           env.fromSource(
                           postgresIncrementalSource,
                           WatermarkStrategy.noWatermarks(),
                           "PostgresParallelSource")
                   .setParallelism(1).map(value -> {
               System.out.println("CDC Event: " + value);
               return value;
           }).addSink(new PrintSinkFunction<>()).name("PrintSink");
   
   
           env.execute("Output Postgres Snapshot");
       }` 
   
   
   
   POM File
   
   `<?xml version="1.0" encoding="UTF-8"?>
   <project xmlns="http://maven.apache.org/POM/4.0.0";
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <modelVersion>4.0.0</modelVersion>
   
       <groupId>org.example</groupId>
       <artifactId>og-flink-cdc-incremental</artifactId>
       <version>1.0-SNAPSHOT</version>
   
       <properties>
           <!-- Use a single, consistent Flink version -->
           <flink.version>1.20.0</flink.version>
           <maven.compiler.source>11</maven.compiler.source>
           <maven.compiler.target>11</maven.compiler.target>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           <scala.binary.version>2.12</scala.binary.version>
           <flink-cdc.version>3.4.0</flink-cdc.version>
       </properties>
   
       <dependencies>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-clients</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <!-- Flink CDC dependencies -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-postgres-cdc</artifactId>
               <version>${flink-cdc.version}</version>
           </dependency>
   
           <!-- This dependency is required for the RecordEmitter class -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-base</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-cdc-base</artifactId>
               <version>${flink-cdc.version}</version>
           </dependency>
       </dependencies>
   
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.2.4</version>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               <artifactSet>
                                   <excludes>
                                       
<exclude>org.apache.flink:flink-shaded-guava</exclude>
                                   </excludes>
                               </artifactSet>
                               <filters>
                                   <filter>
                                       <artifact>*:*</artifact>
                                       <excludes>
                                           <exclude>META-INF/*.SF</exclude>
                                           <exclude>META-INF/*.DSA</exclude>
                                           <exclude>META-INF/*.RSA</exclude>
                                       </excludes>
                                   </filter>
                               </filters>
                               <transformers>
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                       
<mainClass>org.example.PostgresParallelSourceExample</mainClass>
                                   </transformer>
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                               </transformers>
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
   </project>
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to