This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e1c0a0184 [Go SDK] Dataframe API wrapper  (#23450)
78e1c0a0184 is described below

commit 78e1c0a0184c9fa08100ecfc02e73defc472016e
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Tue Oct 18 11:04:23 2022 -0400

    [Go SDK] Dataframe API wrapper  (#23450)
---
 CHANGES.md                                         |   1 +
 sdks/go/pkg/beam/schema.go                         |  17 +++
 .../beam/transforms/xlang/dataframe/dataframe.go   |  87 ++++++++++++++
 .../pkg/beam/transforms/xlang/python/external.go   | 129 +++++++++++++++++++++
 sdks/go/test/integration/integration.go            |   1 +
 .../transforms/xlang/dataframe/dataframe.go        |  48 ++++++++
 .../transforms/xlang/dataframe/dataframe_test.go   |  60 ++++++++++
 7 files changed, 343 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index bbcc420caf4..fc04e56fca5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
 ## New Features / Improvements
 
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Dataframe wrapper added in Go SDK via Cross-Language (Need to manually start 
python expansion service). (Go) 
([#23384](https://github.com/apache/beam/issues/23384)).
 * Name all Java threads to aid in debugging 
([#23049](https://github.com/apache/beam/issues/23049)).
 
 ## Breaking Changes
diff --git a/sdks/go/pkg/beam/schema.go b/sdks/go/pkg/beam/schema.go
index b25a3e285f7..2a3134455d9 100644
--- a/sdks/go/pkg/beam/schema.go
+++ b/sdks/go/pkg/beam/schema.go
@@ -77,6 +77,23 @@ func RegisterSchemaProvider(rt reflect.Type, provider 
interface{}) {
        coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
 }
 
+// RegisterSchemaProviderWithURN is for internal use only. Users are 
recommended to use
+// beam.RegisterSchemaProvider() instead.
+// RegisterSchemaProviderWithURN registers a new schema provider for a new 
logical type defined
+// in pkg/beam/model/pipeline_v1/schema.pb.go
+//
+// RegisterSchemaProviderWithURN must be called before beam.Init(), and 
conventionally
+// is called in a package init() function.
+func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn 
string) {
+       p := provider.(SchemaProvider)
+       st, err := p.FromLogicalType(rt)
+       if err != nil {
+               panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type 
provider for %v, doesn't support that type", rt))
+       }
+       schema.RegisterLogicalType(schema.ToLogicalType(urn, rt, st))
+       coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
+}
+
 // SchemaProvider specializes schema handling for complex types, including 
conversion to a
 // valid schema base type,
 //
diff --git a/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go 
b/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go
new file mode 100644
index 00000000000..b4f4c37f115
--- /dev/null
+++ b/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package dataframe is a wrapper for DataframeTransform defined in Apache 
Beam Python SDK.
+// An exapnsion service for python external transforms can be started by 
running
+// $ python -m apache_beam.runners.portability.expansion_service_main -p 
$PORT_FOR_EXPANSION_SERVICE
+package dataframe
+
+import (
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*config)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*kwargs)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*argStruct)(nil)).Elem())
+}
+
+type kwargs struct {
+       Fn             python.CallableSource `beam:"func"`
+       IncludeIndexes bool                  `beam:"include_indexes"`
+}
+
+type argStruct struct{}
+
+type config struct {
+       dpl           kwargs
+       expansionAddr string
+}
+
+type configOption func(*config)
+
+// WithExpansionAddr sets an URL for a Python expansion service.
+func WithExpansionAddr(expansionAddr string) configOption {
+       return func(c *config) {
+               c.expansionAddr = expansionAddr
+       }
+}
+
+// WithIndexes sets include_indexes option for DataframeTransform.
+func WithIndexes() configOption {
+       return func(c *config) {
+               c.dpl.IncludeIndexes = true
+       }
+}
+
+// Transform is a multi-language wrapper for a Python DataframeTransform with 
a given lambda function.
+// lambda function is a required parameter.
+// Additional option for including indexes in dataframe can be provided by 
using
+// dataframe.WithIndexes().
+func Transform(s beam.Scope, fn string, col beam.PCollection, outT 
reflect.Type, opts ...configOption) beam.PCollection {
+       s.Scope("xlang.python.DataframeTransform")
+       cfg := config{
+               dpl: kwargs{Fn: python.CallableSource(fn)},
+       }
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       // TODO: load automatic expansion service here
+       if cfg.expansionAddr == "" {
+               panic("no expansion service address provided for 
xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.")
+       }
+
+       pet := python.NewExternalTransform[argStruct, 
kwargs]("apache_beam.dataframe.transforms.DataframeTransform")
+       pet.WithKwargs(cfg.dpl)
+       pl := beam.CrossLanguagePayload(pet)
+       result := beam.CrossLanguage(s, 
"beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr, 
beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT)))
+       return result[beam.UnnamedOutputTag()]
+
+}
diff --git a/sdks/go/pkg/beam/transforms/xlang/python/external.go 
b/sdks/go/pkg/beam/transforms/xlang/python/external.go
new file mode 100644
index 00000000000..3eb14c3c134
--- /dev/null
+++ b/sdks/go/pkg/beam/transforms/xlang/python/external.go
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package python contains data structures required for python external 
transforms in a multilanguage pipeline.
+package python
+
+import (
+       "fmt"
+       "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+
+const (
+       pythonCallableUrn = "beam:logical_type:python_callable:v1"
+)
+
+var (
+       pcsType        = reflect.TypeOf((*CallableSource)(nil)).Elem()
+       pcsStorageType = reflectx.String
+)
+
+func init() {
+       beam.RegisterType(pcsType)
+       beam.RegisterSchemaProviderWithURN(pcsType, &callableSourceProvider{}, 
pythonCallableUrn)
+}
+
+// CallableSource is a wrapper object storing a Python function definition
+// that can be evaluated to Python callables in Python SDK.
+//
+// The snippet of Python code can be a valid Python expression such as
+//    lambda x: x * x
+//       str.upper
+// a fully qualified name such as
+//    math.sin
+// or a complete multi-line function or class definition such as
+//    def foo(x):
+//        ...
+//    class Foo:
+//        ...
+//
+// Any lines preceding the function definition are first evaluated to provide 
context in which to
+// define the function which can be useful to declare imports or any other 
needed values, e.g.
+//    import math
+//
+//    def helper(x):
+//        return x * x
+//
+//    def func(y):
+//        return helper(y) + y
+// in which case `func` would get applied to each element.
+type CallableSource string
+
+// callableSourceProvider implement the SchemaProvider interface for logical 
types
+type callableSourceProvider struct{}
+
+// FromLogicalType returns the goType of the logical type
+func (p *callableSourceProvider) FromLogicalType(rt reflect.Type) 
(reflect.Type, error) {
+       if rt != pcsType {
+               return nil, fmt.Errorf("unable to provide schema.LogicalType 
for type %v, want %v", rt, pcsType)
+       }
+       return pcsStorageType, nil
+}
+
+// BuildEncoder encodes the PythonCallableSource logical type
+func (p *callableSourceProvider) BuildEncoder(rt reflect.Type) 
(func(interface{}, io.Writer) error, error) {
+       if _, err := p.FromLogicalType(rt); err != nil {
+               return nil, err
+       }
+
+       return func(iface interface{}, w io.Writer) error {
+               v := iface.(CallableSource)
+               return coder.EncodeStringUTF8(string(v), w)
+       }, nil
+}
+
+// BuildDecoder decodes the PythonCallableSource logical type
+func (p *callableSourceProvider) BuildDecoder(rt reflect.Type) 
(func(io.Reader) (interface{}, error), error) {
+       if _, err := p.FromLogicalType(rt); err != nil {
+               return nil, err
+       }
+
+       return func(r io.Reader) (interface{}, error) {
+               s, err := coder.DecodeStringUTF8(r)
+               if err != nil {
+                       return nil, err
+               }
+               return CallableSource(s), nil
+       }, nil
+}
+
+// NewExternalTransform creates a new instance for python external transform. 
It accepts two types:
+// A: used for normal arguments
+// K: used for keyword arguments
+func NewExternalTransform[A, K any](constructor string) 
*pythonExternalTransform[A, K] {
+       return &pythonExternalTransform[A, K]{Constructor: constructor}
+}
+
+// PythonExternalTransform holds the details required for an External Python 
Transform.
+type pythonExternalTransform[A, K any] struct {
+       Constructor string `beam:"constructor"`
+       Args        A      `beam:"args"`
+       Kwargs      K      `beam:"kwargs"`
+}
+
+// WithArgs adds arguments to the External Python Transform.
+func (p *pythonExternalTransform[A, K]) WithArgs(args any) {
+       p.Args = args.(A)
+}
+
+// WithKwargs adds keyword arguments to the External Python Transform.
+func (p *pythonExternalTransform[A, K]) WithKwargs(kwargs any) {
+       p.Kwargs = kwargs.(K)
+}
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index b70a861064b..26acc5137e6 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -71,6 +71,7 @@ var directFilters = []string{
        "TestDebeziumIO_BasicRead",
        "TestJDBCIO_BasicReadWrite",
        "TestJDBCIO_PostgresReadWrite",
+       "TestDataframe",
        // Triggers, Panes are not yet supported
        "TestTrigger.*",
        "TestPanes",
diff --git a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go 
b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
new file mode 100644
index 00000000000..8a5a6339841
--- /dev/null
+++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataframe
+
+import (
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/dataframe"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*TestRow)(nil)).Elem())
+}
+
+type TestRow struct {
+       A int64 `beam:"a"`
+       B int32 `beam:"b"`
+}
+
+func DataframeTransform(expansionAddr string) *beam.Pipeline {
+       row0 := TestRow{A: int64(100), B: int32(1)}
+       row1 := TestRow{A: int64(100), B: int32(2)}
+       row2 := TestRow{A: int64(100), B: int32(3)}
+       row3 := TestRow{A: int64(200), B: int32(4)}
+
+       p, s := beam.NewPipelineWithRoot()
+
+       input := beam.Create(s, row0, row1, row3)
+       outCol := dataframe.Transform(s, "lambda df: df.groupby('a').sum()", 
input, reflect.TypeOf((*TestRow)(nil)).Elem(), 
dataframe.WithExpansionAddr(expansionAddr), dataframe.WithIndexes())
+
+       passert.Equals(s, outCol, row2, row3)
+       return p
+}
diff --git 
a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go 
b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go
new file mode 100644
index 00000000000..4d73e7dc931
--- /dev/null
+++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataframe
+
+import (
+       "flag"
+       "log"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+var expansionAddr string // Populate with expansion address labelled 
"python_transform".
+
+func checkFlags(t *testing.T) {
+       if expansionAddr == "" {
+               t.Skip("No python transform expansion address provided.")
+       }
+}
+
+func TestDataframe(t *testing.T) {
+       integration.CheckFilters(t)
+       checkFlags(t)
+       p := DataframeTransform(expansionAddr)
+       ptest.RunAndValidate(t, p)
+}
+
+func TestMain(m *testing.M) {
+       flag.Parse()
+       beam.Init()
+
+       services := integration.NewExpansionServices()
+       defer func() { services.Shutdown() }()
+       addr, err := services.GetAddr("python_transform")
+       if err != nil {
+               log.Printf("skipping missing expansion service: %v", err)
+       } else {
+               expansionAddr = addr
+       }
+
+       ptest.MainRet(m)
+}

Reply via email to