This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ff96dfd329 Update Golang - DebeziumIO 3.1.3 (#37667)
9ff96dfd329 is described below

commit 9ff96dfd3299cd3604bb4a0c1aaeceb3bddb4d85
Author: Tobias Kaymak <[email protected]>
AuthorDate: Sat Feb 21 00:31:25 2026 +0100

    Update Golang - DebeziumIO 3.1.3 (#37667)
---
 sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go                 | 8 ++++++++
 sdks/go/test/integration/io/xlang/debezium/debezium.go           | 3 ++-
 sdks/go/test/integration/io/xlang/debezium/debezium_test.go      | 3 +--
 .../org/apache/beam/io/debezium/DebeziumTransformRegistrar.java  | 9 +++++++++
 4 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go 
b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
index eb43fd74175..547aba0ceb9 100644
--- a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
+++ b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
@@ -75,6 +75,7 @@ type readFromDebeziumSchema struct {
        Host                 string
        Port                 string
        MaxNumberOfRecords   *int64
+       MaxTimeToRun         *int64
        ConnectionProperties []string
 }
 
@@ -133,6 +134,13 @@ func MaxRecord(r int64) readOption {
        }
 }
 
+// MaxTimeToRun specifies maximum number of milliseconds to run before stop.
+func MaxTimeToRun(r int64) readOption {
+       return func(cfg *debeziumConfig) {
+               cfg.readSchema.MaxTimeToRun = &r
+       }
+}
+
 // ConnectionProperties specifies properties of the debezium connection passed 
as
 // a string with format [propertyName=property;]*
 func ConnectionProperties(cp []string) readOption {
diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium.go 
b/sdks/go/test/integration/io/xlang/debezium/debezium.go
index 26e4d974abf..e1b9bab963c 100644
--- a/sdks/go/test/integration/io/xlang/debezium/debezium.go
+++ b/sdks/go/test/integration/io/xlang/debezium/debezium.go
@@ -29,8 +29,9 @@ func ReadPipeline(addr, username, password, dbname, host, 
port string, connector
        p, s := beam.NewPipelineWithRoot()
        result := debeziumio.Read(s.Scope("Read from debezium"), username, 
password, host, port,
                connectorClass, reflectx.String, 
debeziumio.MaxRecord(maxrecords),
+               debeziumio.MaxTimeToRun(120000),
                debeziumio.ConnectionProperties(connectionProperties), 
debeziumio.ExpansionAddr(addr))
-       expectedJson := 
`{"metadata":{"connector":"postgresql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"[email protected]"}}}`
+       expectedJson := 
`{"metadata":{"connector":"postgresql","version":"3.1.3.Final","name":"beam-debezium-connector","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"[email protected]"}}}`
        expected := beam.Create(s, expectedJson)
        passert.Equals(s, result, expected)
        return p
diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go 
b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
index 208a062f943..a4850d4a3a3 100644
--- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
+++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go
@@ -34,7 +34,7 @@ import (
 )
 
 const (
-       debeziumImage = "quay.io/debezium/example-postgres:latest"
+       debeziumImage = "quay.io/debezium/example-postgres:3.1.3.Final"
        debeziumPort  = "5432/tcp"
        maxRetries    = 5
 )
@@ -82,7 +82,6 @@ func TestDebeziumIO_BasicRead(t *testing.T) {
        connectionProperties := []string{
                "database.dbname=inventory",
                "database.server.name=dbserver1",
-               "database.include.list=inventory",
                "include.schema.changes=false",
        }
        read := ReadPipeline(expansionAddr, username, password, dbname, host, 
port, debeziumio.PostgreSQL, 1, connectionProperties)
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
index eb6732180b0..22a34ae2654 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
@@ -77,6 +77,7 @@ public class DebeziumTransformRegistrar implements 
ExternalTransformRegistrar {
     public static class Configuration extends CrossLanguageConfiguration {
       private @Nullable List<String> connectionProperties;
       private @Nullable Long maxNumberOfRecords;
+      private @Nullable Long maxTimeToRun;
 
       public void setConnectionProperties(@Nullable List<String> 
connectionProperties) {
         this.connectionProperties = connectionProperties;
@@ -85,6 +86,10 @@ public class DebeziumTransformRegistrar implements 
ExternalTransformRegistrar {
       public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
         this.maxNumberOfRecords = maxNumberOfRecords;
       }
+
+      public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
+        this.maxTimeToRun = maxTimeToRun;
+      }
     }
 
     @Override
@@ -114,6 +119,10 @@ public class DebeziumTransformRegistrar implements 
ExternalTransformRegistrar {
             
readTransform.withMaxNumberOfRecords(configuration.maxNumberOfRecords.intValue());
       }
 
+      if (configuration.maxTimeToRun != null) {
+        readTransform = 
readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
+      }
+
       return readTransform;
     }
   }

Reply via email to