This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9ff96dfd329 Update Golang - DebeziumIO 3.1.3 (#37667)
9ff96dfd329 is described below
commit 9ff96dfd3299cd3604bb4a0c1aaeceb3bddb4d85
Author: Tobias Kaymak <[email protected]>
AuthorDate: Sat Feb 21 00:31:25 2026 +0100
Update Golang - DebeziumIO 3.1.3 (#37667)
---
sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go | 8 ++++++++
sdks/go/test/integration/io/xlang/debezium/debezium.go | 3 ++-
sdks/go/test/integration/io/xlang/debezium/debezium_test.go | 3 +--
.../org/apache/beam/io/debezium/DebeziumTransformRegistrar.java | 9 +++++++++
4 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
index eb43fd74175..547aba0ceb9 100644
--- a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
+++ b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
@@ -75,6 +75,7 @@ type readFromDebeziumSchema struct {
Host string
Port string
MaxNumberOfRecords *int64
+ MaxTimeToRun *int64
ConnectionProperties []string
}
@@ -133,6 +134,13 @@ func MaxRecord(r int64) readOption {
}
}
+// MaxTimeToRun specifies maximum number of milliseconds to run before stop.
+func MaxTimeToRun(r int64) readOption {
+ return func(cfg *debeziumConfig) {
+ cfg.readSchema.MaxTimeToRun = &r
+ }
+}
+
// ConnectionProperties specifies properties of the debezium connection passed
as
// a string with format [propertyName=property;]*
func ConnectionProperties(cp []string) readOption {
diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium.go
b/sdks/go/test/integration/io/xlang/debezium/debezium.go
index 26e4d974abf..e1b9bab963c 100644
--- a/sdks/go/test/integration/io/xlang/debezium/debezium.go
+++ b/sdks/go/test/integration/io/xlang/debezium/debezium.go
@@ -29,8 +29,9 @@ func ReadPipeline(addr, username, password, dbname, host,
port string, connector
p, s := beam.NewPipelineWithRoot()
result := debeziumio.Read(s.Scope("Read from debezium"), username,
password, host, port,
connectorClass, reflectx.String,
debeziumio.MaxRecord(maxrecords),
+ debeziumio.MaxTimeToRun(120000),
debeziumio.ConnectionProperties(connectionProperties),
debeziumio.ExpansionAddr(addr))
- expectedJson :=
`{"metadata":{"connector":"postgresql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"[email protected]"}}}`
+ expectedJson :=
`{"metadata":{"connector":"postgresql","version":"3.1.3.Final","name":"beam-debezium-connector","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"[email protected]"}}}`
expected := beam.Create(s, expectedJson)
passert.Equals(s, result, expected)
return p
diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
index 208a062f943..a4850d4a3a3 100644
--- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
+++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
@@ -34,7 +34,7 @@ import (
)
const (
- debeziumImage = "quay.io/debezium/example-postgres:latest"
+ debeziumImage = "quay.io/debezium/example-postgres:3.1.3.Final"
debeziumPort = "5432/tcp"
maxRetries = 5
)
@@ -82,7 +82,6 @@ func TestDebeziumIO_BasicRead(t *testing.T) {
connectionProperties := []string{
"database.dbname=inventory",
"database.server.name=dbserver1",
- "database.include.list=inventory",
"include.schema.changes=false",
}
read := ReadPipeline(expansionAddr, username, password, dbname, host,
port, debeziumio.PostgreSQL, 1, connectionProperties)
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
index eb6732180b0..22a34ae2654 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
@@ -77,6 +77,7 @@ public class DebeziumTransformRegistrar implements
ExternalTransformRegistrar {
public static class Configuration extends CrossLanguageConfiguration {
private @Nullable List<String> connectionProperties;
private @Nullable Long maxNumberOfRecords;
+ private @Nullable Long maxTimeToRun;
public void setConnectionProperties(@Nullable List<String>
connectionProperties) {
this.connectionProperties = connectionProperties;
@@ -85,6 +86,10 @@ public class DebeziumTransformRegistrar implements
ExternalTransformRegistrar {
public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
this.maxNumberOfRecords = maxNumberOfRecords;
}
+
+ public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
+ this.maxTimeToRun = maxTimeToRun;
+ }
}
@Override
@@ -114,6 +119,10 @@ public class DebeziumTransformRegistrar implements
ExternalTransformRegistrar {
readTransform.withMaxNumberOfRecords(configuration.maxNumberOfRecords.intValue());
}
+ if (configuration.maxTimeToRun != null) {
+ readTransform =
readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
+ }
+
return readTransform;
}
}