[ 
https://issues.apache.org/jira/browse/BEAM-13761?focusedWorklogId=720218&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-720218
 ]

ASF GitHub Bot logged work on BEAM-13761:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Feb/22 15:25
            Start Date: 03/Feb/22 15:25
    Worklog Time Spent: 10m 
      Work Description: riteshghorse commented on a change in pull request 
#16642:
URL: https://github.com/apache/beam/pull/16642#discussion_r798678519



##########
File path: sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package debeziumio contains cross-language functionality for using Debezium
+// (http://kafka.apache.org/). These transforms only work on runners that
+// support cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam 
SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs including expansion service modules
+// * Java
+//       - Vendored Module: beam-sdks-java-io-debezium-expansion-service
+//       - Run via Gradle: ./gradlew 
:sdks:java:io:debezium:expansion-service:shadowJar
+//                                              java -jar 
<path-to-debezium-jar> <port>
+//    - Reference Class: org.apache.beam.io.debezium.DebeziumIO
+package debeziumio
+
+import (
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+// DriverClassName is the type for valid and supported Database connectors for 
Debezium IO.
+type DriverClassName string
+
+const (
+       // MySQL connector for Debezium
+       MySQL DriverClassName = "MySQL"
+       // PostgreSQL connector for Debezium
+       PostgreSQL = "PostgreSQL"
+)
+
+const readURN = "beam:transform:org.apache.beam:debezium_read:v1"
+
+// readFromDebeziumSchema is config schema that matches exactly with the 
Java's Debezium IO
+// for cross language payload.
+type readFromDebeziumSchema struct {
+       ConnectorClass       string
+       Username             string
+       Password             string
+       Host                 string
+       Port                 string
+       MaxNumberOfRecords   *int64
+       ConnectionProperties []string
+}
+
+type debeziumConfig struct {
+       expansionService string
+       readSchema       *readFromDebeziumSchema
+}
+
+// readOption facilitates additional parameters to debeziumio.Read() 
Ptransform.
+type readOption func(*debeziumConfig)
+
+// Read is an external PTransform which reads from Debezium and returns a
+// JSON string. It requires the address of an expansion service for Debezium 
IO.
+//
+// Example:
+//      username := "debezium"
+//   password := "dbz"
+//   host := "localhost"
+//   port := "5432"
+//   connectorClass := debeziumIO.POSTGRESQL
+//   maxrecords := 1
+//   debeziumio.Read(s.Scope("Read from debezium"), expansionAddr, username, 
password, host, port, connectorClass,
+//                   reflectx.String, debeziumio.MaxRecord(maxrecords), 
debeziumio.ExpansionService("localhost:9000"))
+func Read(s beam.Scope, username, password, host, port string, connectorClass 
DriverClassName, t reflect.Type, opts ...readOption) beam.PCollection {
+       rfds := readFromDebeziumSchema{
+               ConnectorClass: string(connectorClass),
+               Username:       username,
+               Password:       password,
+               Host:           host,
+               Port:           port,
+       }
+       dc := debeziumConfig{readSchema: &rfds}
+       for _, opt := range opts {
+               opt(&dc)
+       }
+       pl := beam.CrossLanguagePayload(rfds)
+       outT := beam.UnnamedOutput(typex.New(t))
+       out := beam.CrossLanguage(s, readURN, pl, dc.expansionService, nil, 
outT)

Review comment:
       Got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 720218)
    Time Spent: 3.5h  (was: 3h 20m)

> Xlang Debezium IO wrapper for Go SDK
> ------------------------------------
>
>                 Key: BEAM-13761
>                 URL: https://issues.apache.org/jira/browse/BEAM-13761
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>            Reporter: Ritesh Ghorse
>            Assignee: Ritesh Ghorse
>            Priority: P2
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Implement a wrapper for Debezium IO  for Go SDK. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to