[ 
https://issues.apache.org/jira/browse/BEAM-13293?focusedWorklogId=715203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-715203
 ]

ASF GitHub Bot logged work on BEAM-13293:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jan/22 18:35
            Start Date: 25/Jan/22 18:35
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #16111:
URL: https://github.com/apache/beam/pull/16111#discussion_r791976644



##########
File path: sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go
##########
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package jdbc
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+       "testing"
+       "time"
+
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/v2/go/test/integration"
+       "github.com/docker/go-connections/nat"
+       _ "github.com/go-sql-driver/mysql"
+       _ "github.com/lib/pq"
+       "github.com/testcontainers/testcontainers-go"
+       "github.com/testcontainers/testcontainers-go/wait"
+)
+
+func checkFlags(t *testing.T) {
+       if *integration.IoExpansionAddr == "" {
+               t.Skip("No IO expansion address provided.")
+       }
+}
+
+func setupTestContainer(t *testing.T, dbname, username, password string) int {
+       t.Helper()
+
+       var env = map[string]string{
+               "POSTGRES_PASSWORD": password,
+               "POSTGRES_USER":     username,
+               "POSTGRES_DB":       dbname,
+       }
+
+       var port = "5432/tcp"

Review comment:
       If this is the port that the test container is exposing, then it's 
probably fine, but as a rule:
   
   Don't hardcode ports in tests. We don't know anything about the machines 
they're running on, and it may be used.
   You can find an open port by requesting one from the operating system. 
Similar to
   
https://github.com/apache/beam/blob/6d471c920edd8aaf9a4d2eb628fe294ea750165b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go#L38
   
   (likely best to copy that findOpenPort code snippet.)
   

##########
File path: sdks/java/container/boot.go
##########
@@ -138,6 +138,7 @@ func main() {
                filepath.Join(jarsDir, "beam-sdks-java-harness.jar"),
                filepath.Join(jarsDir, "beam-sdks-java-io-kafka.jar"),
                filepath.Join(jarsDir, "kafka-clients.jar"),
+               filepath.Join(jarsDir, "postgresql.jar"),

Review comment:
       If the python tests work already is this line even required? Isn't this 
supposed to be handled by he artifact fetching step? 
   We may want to clarify this @chamikaramj and @ihji. Does adding this line 
here mean we're requiring all Java jobs to have this Jar?

##########
File path: sdks/go/pkg/beam/io/xlang/jdbcio/jdbc.go
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jdbcio contains cross-language functionality for reading and 
writing data to JDBC.
+// These transforms only work on runners that support cross-language 
transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam 
SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Run expansion service: ./gradlew 
:sdks:java:extensions:schemaio-expansion-service:build
+// java -jar <location_of_jar_file_generated_from_above> port
+
+package jdbcio
+
+import (
+       "bytes"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*jdbcConfigSchema)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*config)(nil)).Elem())
+}
+
+const (
+       readURN  = "beam:transform:org.apache.beam:schemaio_jdbc_read:v1"
+       writeURN = "beam:transform:org.apache.beam:schemaio_jdbc_write:v1"
+)
+
+// jdbcConfigSchema is the config schema as per the expected corss language 
payload
+// for JDBC IO read and write transform.
+type jdbcConfigSchema struct {
+       Location   string  `beam:"location"`
+       Config     []byte  `beam:"config"`
+       DataSchema *[]byte `beam:"dataSchema"`
+}
+
+// config is used to set the config field of jdbcConfigSchema. It contains the
+// details required to make a connection to the JDBC database.
+type config struct {
+       DriverClassName       string    `beam:"driverClassName"`
+       JDBCUrl               string    `beam:"jdbcUrl"`
+       Username              string    `beam:"username"`
+       Password              string    `beam:"password"`
+       ConnectionProperties  *string   `beam:"connectionProperties"`
+       ConnectionInitSQLs    *[]string `beam:"connectionInitSqls"`
+       ReadQuery             *string   `beam:"readQuery"`
+       WriteStatement        *string   `beam:"writeStatement"`
+       FetchSize             *int16    `beam:"fetchSize"`
+       OutputParallelization *bool     `beam:"outputParallelization"`
+}
+
+func toRow(pl interface{}) []byte {
+       rt := reflect.TypeOf(pl)
+
+       enc, err := coder.RowEncoderForStruct(rt)
+       if err != nil {
+               panic(fmt.Errorf("unable to get row encoder"))
+       }
+       var buf bytes.Buffer
+       if err := enc(pl, &buf); err != nil {
+               panic(fmt.Errorf("unable to do row encoding"))
+       }
+       return buf.Bytes()
+}
+
+// Write is a cross-language PTransform which writes Rows to the specified 
database via JDBC.
+// Write requires the address for an expansion service. tableName is a 
required paramater,
+// and by default, the writeStatement is generated from it. The generated 
write_statement
+// can be overridden by passing in a write_statment.
+//
+// The default write statement is: "INSERT INTO tableName(column1, ...) INTO 
VALUES(value1, ...)"
+// Example:
+
+// expansionAddr := "localhost:9000"
+// tableName := "roles"
+// driverClassName := "org.postgresql.Driver"
+// username := "root"
+// password := "root123"
+// jdbcUrl := "jdbc:postgresql://localhost:5432/dbname"
+// jdbcio.Write(s, expansionAddr, tableName, driverClassName, jdbcurl, 
username, password)
+func Write(s beam.Scope, addr, tableName, driverClassName, jdbcUrl, username, 
password string, col beam.PCollection, opts ...writeOption) {
+       s = s.Scope("jdbcio.Write")
+
+       wpl := config{
+               DriverClassName: driverClassName,
+               JDBCUrl:         jdbcUrl,
+               Username:        username,
+               Password:        password,
+       }
+       for _, opt := range opts {
+               opt(&wpl)
+       }
+       jcs := jdbcConfigSchema{
+               Location: tableName,
+               Config:   toRow(wpl),
+       }
+       pl := beam.CrossLanguagePayload(jcs)
+       beam.CrossLanguage(s, writeURN, pl, addr, beam.UnnamedInput(col), nil)
+}
+
+type writeOption func(*config)
+
+// WriteStatement option overrides the default write statement of
+// "INSERT INTO tableName(column1, ...) INTO VALUES(value1, ...)".
+func WriteStatement(statement string) writeOption {
+       return func(pl *config) {
+               pl.WriteStatement = &statement
+       }
+}
+
+// WriteConnectionProperties properties of the jdbc connection passed as string
+// with format [propertyName=property;].
+func WriteConnectionProperties(properties string) writeOption {
+       return func(pl *config) {
+               pl.ConnectionProperties = &properties
+       }
+}
+
+// ConnectionInitSQLs required only for MySql and MariaDB. passed as list of 
strings.
+func ConnectionInitSQLs(initStatements []string) writeOption {
+       return func(pl *config) {
+               pl.ConnectionInitSQLs = &initStatements
+       }
+}
+
+// Read is a cross-language PTransform which read Rows from the specified 
database via JDBC.
+// Read requires the address for an expansion service for JDBC Read transforms,
+// tableName is a required paramater, and by default, the readQuery is 
generated from it.
+// The generated readQuery can be overridden by passing in a readQuery.
+//
+// The default read query is "SELECT * FROM tableName;"
+//
+// Read also accepts optional parameters as readOptions. All optional 
parameters
+// are predefined in this package as functions that return readOption. To set
+// an optional parameter, call the function within Read's function signature.
+//
+// Example:
+// expansionAddr := "localhost:9000"
+// tableName := "roles"
+// driverClassName := "org.postgresql.Driver"
+// username := "root"
+// password := "root123"
+// jdbcUrl := "jdbc:postgresql://localhost:5432/dbname"
+// jdbcio.Read(s, expansionAddr, tableName, driverClassName, jdbcurl, 
username, password)
+func Read(s beam.Scope, addr, tableName, driverClassName, jdbcUrl, username, 
password string, j interface{}, opts ...readOption) beam.PCollection {
+       s = s.Scope("jdbcio.Read")
+
+       rpl := config{
+               DriverClassName: driverClassName,
+               JDBCUrl:         jdbcUrl,
+               Username:        username,
+               Password:        password,
+       }
+       for _, opt := range opts {
+               opt(&rpl)
+       }
+       jcs := jdbcConfigSchema{
+               Location: tableName,
+               Config:   toRow(rpl),
+       }
+
+       pl := beam.CrossLanguagePayload(jcs)
+       result := beam.CrossLanguage(s, readURN, pl, addr, nil, 
beam.UnnamedOutput(typex.New(reflect.TypeOf(j))))
+       return result[beam.UnnamedOutputTag()]
+}
+
+type readOption func(*config)
+
+// ReadQuery overrides the default read query "SELECT * FROM tableName;"
+func ReadQuery(query string) readOption {
+       return func(pl *config) {
+               pl.ReadQuery = &query
+       }
+}
+
+// OutputParallelization  specifies if output parallelization on.

Review comment:
       extra space & grammar. 
   ```suggestion
   // OutputParallelization specifies if output parallelization is on.
   ```

##########
File path: sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go
##########
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package jdbc
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+       "testing"
+       "time"
+
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/v2/go/test/integration"
+       "github.com/docker/go-connections/nat"
+       _ "github.com/go-sql-driver/mysql"
+       _ "github.com/lib/pq"
+       "github.com/testcontainers/testcontainers-go"
+       "github.com/testcontainers/testcontainers-go/wait"
+)
+
+func checkFlags(t *testing.T) {
+       if *integration.IoExpansionAddr == "" {
+               t.Skip("No IO expansion address provided.")
+       }
+}
+
+func setupTestContainer(t *testing.T, dbname, username, password string) int {
+       t.Helper()
+
+       var env = map[string]string{
+               "POSTGRES_PASSWORD": password,
+               "POSTGRES_USER":     username,
+               "POSTGRES_DB":       dbname,
+       }
+
+       var port = "5432/tcp"
+       dbURL := func(port nat.Port) string {
+               return 
fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, 
password, port.Port(), dbname)
+       }
+
+       req := testcontainers.GenericContainerRequest{
+               ContainerRequest: testcontainers.ContainerRequest{
+                       Image:        "postgres",
+                       ExposedPorts: []string{port},
+                       Env:          env,
+                       WaitingFor:   wait.ForSQL(nat.Port(port), "postgres", 
dbURL).Timeout(time.Second * 5),
+               },
+               Started: true,
+       }
+       ctx := context.Background()
+       container, err := testcontainers.GenericContainer(ctx, req)
+       if err != nil {
+               t.Errorf("failed to start container: %s", err)
+       }
+
+       mappedPort, err := container.MappedPort(ctx, nat.Port(port))
+       if err != nil {
+               t.Errorf("failed to get container external port: %s", err)
+       }
+       p := mappedPort.Int()

Review comment:
       Consider just calling this at the return line anyway, as you aren't 
using the variable p anywhere else.
   
   Go's idiom for variable names is "short variable names for short scopes" 
only applies in so far as the use is pretty clear or nearby, and 13 lines 
doesn't cut it, and the variable doesn't add clarity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 715203)
    Time Spent: 7.5h  (was: 7h 20m)

> Go SDK JDBC IO XLang Transform
> ------------------------------
>
>                 Key: BEAM-13293
>                 URL: https://issues.apache.org/jira/browse/BEAM-13293
>             Project: Beam
>          Issue Type: New Feature
>          Components: cross-language, sdk-go
>            Reporter: Ritesh Ghorse
>            Assignee: Ritesh Ghorse
>            Priority: P2
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> Add a cross language transform support in Go SDK for JDBC IO.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to