sjay1728 opened a new issue, #28248:
URL: https://github.com/apache/beam/issues/28248

   In my current scenario, I have an Apache Beam application that employs 
Debezium for capturing changes in data. Unlike the typical integration with 
Kafka, I am not using it in conjunction with Kafka. However, I've encountered a 
challenge related to restarting the application. Whenever I restart the 
application, it retrieves all the data from the very beginning. My objective is 
to ensure that the application resumes consuming data from where it left off 
before the restart. I would greatly appreciate any guidance or insights on how 
to achieve this. Here is the structure of my application:
   
   ```
   public class PipelineTest {
   private static final DebeziumIO.ConnectorConfiguration mySqlConnectorConfig 
= DebeziumIO.ConnectorConfiguration
   .create()
   .withUsername("xxxxxxxx")
   .withPassword("xxxxxxxx")
   .withHostName("localhost")
   .withPort("5432")
   .withConnectorClass(PostgresConnector.class)
   .withConnectionProperty("database.dbname", "xxxxxxxx")
   .withConnectionProperty("database.server.id", "xxxxxxxx")
   .withConnectionProperty("database.server.name", "xxxxxxxx")
   .withConnectionProperty("plugin.name", "pgoutput")
   .withConnectionProperty("slot.name", "debezium5")
   .withConnectionProperty("table.include.list", "public.table1");
   
   public static void main(String[] args) {
       PipelineOptions options = PipelineOptionsFactory.create();
       options.setRunner(org.apache.beam.runners.direct.DirectRunner.class); // 
Set the Direct Runner
       Pipeline pipeline = Pipeline.create(options);
   
       System.out.println("Pipeline starting...");
   
       PCollection<String> records = pipeline
               .apply(DebeziumIO.<String>read()
                       .withConnectorConfiguration(mySqlConnectorConfig)
                       .withFormatFunction(sourceRecord ->
                            sourceRecord.value().toString())
                       .withCoder(StringUtf8Coder.of())
               );
   
       records.apply(ParDo.of(new DoFn<String, String>() {
           @ProcessElement
           public void processElement(ProcessContext c) {
               System.out.println("Pipeline starting 3..." + c.element());
           }
       }));
   
       System.out.println("Pipeline ending...");
   
       pipeline.run().waitUntilFinish();
   }
   }
   ```
   it seems like replication slot related following values are not changing 
with the at all, the query =>[ SELECT * FROM pg_replication_slots; ] 
catalog_xmin restart_lsn confirmed_flush_lsn
   
   I would greatly appreciate any guidance or insights on how to achieve this..
   
   I would greatly appreciate any guidance or insights on how to achieve this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to