sjay1728 opened a new issue, #28248:
URL: https://github.com/apache/beam/issues/28248
In my current scenario, I have an Apache Beam application that employs
Debezium for capturing changes in data. Unlike the typical integration with
Kafka, I am not using it in conjunction with Kafka. However, I've encountered a
challenge related to restarting the application. Whenever I restart the
application, it retrieves all the data from the very beginning. My objective is
to ensure that the application resumes consuming data from where it left off
before the restart. I would greatly appreciate any guidance or insights on how
to achieve this. Here is the structure of my application:
```
public class PipelineTest {
private static final DebeziumIO.ConnectorConfiguration mySqlConnectorConfig
= DebeziumIO.ConnectorConfiguration
.create()
.withUsername("xxxxxxxx")
.withPassword("xxxxxxxx")
.withHostName("localhost")
.withPort("5432")
.withConnectorClass(PostgresConnector.class)
.withConnectionProperty("database.dbname", "xxxxxxxx")
.withConnectionProperty("database.server.id", "xxxxxxxx")
.withConnectionProperty("database.server.name", "xxxxxxxx")
.withConnectionProperty("plugin.name", "pgoutput")
.withConnectionProperty("slot.name", "debezium5")
.withConnectionProperty("table.include.list", "public.table1");
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(org.apache.beam.runners.direct.DirectRunner.class); //
Set the Direct Runner
Pipeline pipeline = Pipeline.create(options);
System.out.println("Pipeline starting...");
PCollection<String> records = pipeline
.apply(DebeziumIO.<String>read()
.withConnectorConfiguration(mySqlConnectorConfig)
.withFormatFunction(sourceRecord ->
sourceRecord.value().toString())
.withCoder(StringUtf8Coder.of())
);
records.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline starting 3..." + c.element());
}
}));
System.out.println("Pipeline ending...");
pipeline.run().waitUntilFinish();
}
}
```
it seems like replication slot related following values are not changing
with the at all, the query =>[ SELECT * FROM pg_replication_slots; ]
catalog_xmin restart_lsn confirmed_flush_lsn
I would greatly appreciate any guidance or insights on how to achieve this..
I would greatly appreciate any guidance or insights on how to achieve this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]