This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78e1c0a0184 [Go SDK] Dataframe API wrapper (#23450)
78e1c0a0184 is described below
commit 78e1c0a0184c9fa08100ecfc02e73defc472016e
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Tue Oct 18 11:04:23 2022 -0400
[Go SDK] Dataframe API wrapper (#23450)
---
CHANGES.md | 1 +
sdks/go/pkg/beam/schema.go | 17 +++
.../beam/transforms/xlang/dataframe/dataframe.go | 87 ++++++++++++++
.../pkg/beam/transforms/xlang/python/external.go | 129 +++++++++++++++++++++
sdks/go/test/integration/integration.go | 1 +
.../transforms/xlang/dataframe/dataframe.go | 48 ++++++++
.../transforms/xlang/dataframe/dataframe_test.go | 60 ++++++++++
7 files changed, 343 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index bbcc420caf4..fc04e56fca5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
## New Features / Improvements
* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Dataframe wrapper added in Go SDK via Cross-Language (Need to manually start
python expansion service). (Go)
([#23384](https://github.com/apache/beam/issues/23384)).
* Name all Java threads to aid in debugging
([#23049](https://github.com/apache/beam/issues/23049)).
## Breaking Changes
diff --git a/sdks/go/pkg/beam/schema.go b/sdks/go/pkg/beam/schema.go
index b25a3e285f7..2a3134455d9 100644
--- a/sdks/go/pkg/beam/schema.go
+++ b/sdks/go/pkg/beam/schema.go
@@ -77,6 +77,23 @@ func RegisterSchemaProvider(rt reflect.Type, provider
interface{}) {
coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
}
+// RegisterSchemaProviderWithURN is for internal use only. Users are
recommended to use
+// beam.RegisterSchemaProvider() instead.
+// RegisterSchemaProviderWithURN registers a new schema provider for a new
logical type defined
+// in pkg/beam/model/pipeline_v1/schema.pb.go
+//
+// RegisterSchemaProviderWithURN must be called before beam.Init(), and
conventionally
+// is called in a package init() function.
+func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn
string) {
+ p := provider.(SchemaProvider)
+ st, err := p.FromLogicalType(rt)
+ if err != nil {
+ panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type
provider for %v, doesn't support that type", rt))
+ }
+ schema.RegisterLogicalType(schema.ToLogicalType(urn, rt, st))
+ coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
+}
+
// SchemaProvider specializes schema handling for complex types, including
conversion to a
// valid schema base type,
//
diff --git a/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go
b/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go
new file mode 100644
index 00000000000..b4f4c37f115
--- /dev/null
+++ b/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package dataframe is a wrapper for DataframeTransform defined in Apache
Beam Python SDK.
+// An exapnsion service for python external transforms can be started by
running
+// $ python -m apache_beam.runners.portability.expansion_service_main -p
$PORT_FOR_EXPANSION_SERVICE
+package dataframe
+
+import (
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*config)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*kwargs)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*argStruct)(nil)).Elem())
+}
+
+type kwargs struct {
+ Fn python.CallableSource `beam:"func"`
+ IncludeIndexes bool `beam:"include_indexes"`
+}
+
+type argStruct struct{}
+
+type config struct {
+ dpl kwargs
+ expansionAddr string
+}
+
+type configOption func(*config)
+
+// WithExpansionAddr sets an URL for a Python expansion service.
+func WithExpansionAddr(expansionAddr string) configOption {
+ return func(c *config) {
+ c.expansionAddr = expansionAddr
+ }
+}
+
+// WithIndexes sets include_indexes option for DataframeTransform.
+func WithIndexes() configOption {
+ return func(c *config) {
+ c.dpl.IncludeIndexes = true
+ }
+}
+
+// Transform is a multi-language wrapper for a Python DataframeTransform with
a given lambda function.
+// lambda function is a required parameter.
+// Additional option for including indexes in dataframe can be provided by
using
+// dataframe.WithIndexes().
+func Transform(s beam.Scope, fn string, col beam.PCollection, outT
reflect.Type, opts ...configOption) beam.PCollection {
+ s.Scope("xlang.python.DataframeTransform")
+ cfg := config{
+ dpl: kwargs{Fn: python.CallableSource(fn)},
+ }
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ // TODO: load automatic expansion service here
+ if cfg.expansionAddr == "" {
+ panic("no expansion service address provided for
xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.")
+ }
+
+ pet := python.NewExternalTransform[argStruct,
kwargs]("apache_beam.dataframe.transforms.DataframeTransform")
+ pet.WithKwargs(cfg.dpl)
+ pl := beam.CrossLanguagePayload(pet)
+ result := beam.CrossLanguage(s,
"beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr,
beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT)))
+ return result[beam.UnnamedOutputTag()]
+
+}
diff --git a/sdks/go/pkg/beam/transforms/xlang/python/external.go
b/sdks/go/pkg/beam/transforms/xlang/python/external.go
new file mode 100644
index 00000000000..3eb14c3c134
--- /dev/null
+++ b/sdks/go/pkg/beam/transforms/xlang/python/external.go
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package python contains data structures required for python external
transforms in a multilanguage pipeline.
+package python
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+
+const (
+ pythonCallableUrn = "beam:logical_type:python_callable:v1"
+)
+
+var (
+ pcsType = reflect.TypeOf((*CallableSource)(nil)).Elem()
+ pcsStorageType = reflectx.String
+)
+
+func init() {
+ beam.RegisterType(pcsType)
+ beam.RegisterSchemaProviderWithURN(pcsType, &callableSourceProvider{},
pythonCallableUrn)
+}
+
+// CallableSource is a wrapper object storing a Python function definition
+// that can be evaluated to Python callables in Python SDK.
+//
+// The snippet of Python code can be a valid Python expression such as
+// lambda x: x * x
+// str.upper
+// a fully qualified name such as
+// math.sin
+// or a complete multi-line function or class definition such as
+// def foo(x):
+// ...
+// class Foo:
+// ...
+//
+// Any lines preceding the function definition are first evaluated to provide
context in which to
+// define the function which can be useful to declare imports or any other
needed values, e.g.
+// import math
+//
+// def helper(x):
+// return x * x
+//
+// def func(y):
+// return helper(y) + y
+// in which case `func` would get applied to each element.
+type CallableSource string
+
+// callableSourceProvider implement the SchemaProvider interface for logical
types
+type callableSourceProvider struct{}
+
+// FromLogicalType returns the goType of the logical type
+func (p *callableSourceProvider) FromLogicalType(rt reflect.Type)
(reflect.Type, error) {
+ if rt != pcsType {
+ return nil, fmt.Errorf("unable to provide schema.LogicalType
for type %v, want %v", rt, pcsType)
+ }
+ return pcsStorageType, nil
+}
+
+// BuildEncoder encodes the PythonCallableSource logical type
+func (p *callableSourceProvider) BuildEncoder(rt reflect.Type)
(func(interface{}, io.Writer) error, error) {
+ if _, err := p.FromLogicalType(rt); err != nil {
+ return nil, err
+ }
+
+ return func(iface interface{}, w io.Writer) error {
+ v := iface.(CallableSource)
+ return coder.EncodeStringUTF8(string(v), w)
+ }, nil
+}
+
+// BuildDecoder decodes the PythonCallableSource logical type
+func (p *callableSourceProvider) BuildDecoder(rt reflect.Type)
(func(io.Reader) (interface{}, error), error) {
+ if _, err := p.FromLogicalType(rt); err != nil {
+ return nil, err
+ }
+
+ return func(r io.Reader) (interface{}, error) {
+ s, err := coder.DecodeStringUTF8(r)
+ if err != nil {
+ return nil, err
+ }
+ return CallableSource(s), nil
+ }, nil
+}
+
+// NewExternalTransform creates a new instance for python external transform.
It accepts two types:
+// A: used for normal arguments
+// K: used for keyword arguments
+func NewExternalTransform[A, K any](constructor string)
*pythonExternalTransform[A, K] {
+ return &pythonExternalTransform[A, K]{Constructor: constructor}
+}
+
+// PythonExternalTransform holds the details required for an External Python
Transform.
+type pythonExternalTransform[A, K any] struct {
+ Constructor string `beam:"constructor"`
+ Args A `beam:"args"`
+ Kwargs K `beam:"kwargs"`
+}
+
+// WithArgs adds arguments to the External Python Transform.
+func (p *pythonExternalTransform[A, K]) WithArgs(args any) {
+ p.Args = args.(A)
+}
+
+// WithKwargs adds keyword arguments to the External Python Transform.
+func (p *pythonExternalTransform[A, K]) WithKwargs(kwargs any) {
+ p.Kwargs = kwargs.(K)
+}
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index b70a861064b..26acc5137e6 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -71,6 +71,7 @@ var directFilters = []string{
"TestDebeziumIO_BasicRead",
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
+ "TestDataframe",
// Triggers, Panes are not yet supported
"TestTrigger.*",
"TestPanes",
diff --git a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
new file mode 100644
index 00000000000..8a5a6339841
--- /dev/null
+++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataframe
+
+import (
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/dataframe"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*TestRow)(nil)).Elem())
+}
+
+type TestRow struct {
+ A int64 `beam:"a"`
+ B int32 `beam:"b"`
+}
+
+func DataframeTransform(expansionAddr string) *beam.Pipeline {
+ row0 := TestRow{A: int64(100), B: int32(1)}
+ row1 := TestRow{A: int64(100), B: int32(2)}
+ row2 := TestRow{A: int64(100), B: int32(3)}
+ row3 := TestRow{A: int64(200), B: int32(4)}
+
+ p, s := beam.NewPipelineWithRoot()
+
+ input := beam.Create(s, row0, row1, row3)
+ outCol := dataframe.Transform(s, "lambda df: df.groupby('a').sum()",
input, reflect.TypeOf((*TestRow)(nil)).Elem(),
dataframe.WithExpansionAddr(expansionAddr), dataframe.WithIndexes())
+
+ passert.Equals(s, outCol, row2, row3)
+ return p
+}
diff --git
a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go
b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go
new file mode 100644
index 00000000000..4d73e7dc931
--- /dev/null
+++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataframe
+
+import (
+ "flag"
+ "log"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+var expansionAddr string // Populate with expansion address labelled
"python_transform".
+
+func checkFlags(t *testing.T) {
+ if expansionAddr == "" {
+ t.Skip("No python transform expansion address provided.")
+ }
+}
+
+func TestDataframe(t *testing.T) {
+ integration.CheckFilters(t)
+ checkFlags(t)
+ p := DataframeTransform(expansionAddr)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestMain(m *testing.M) {
+ flag.Parse()
+ beam.Init()
+
+ services := integration.NewExpansionServices()
+ defer func() { services.Shutdown() }()
+ addr, err := services.GetAddr("python_transform")
+ if err != nil {
+ log.Printf("skipping missing expansion service: %v", err)
+ } else {
+ expansionAddr = addr
+ }
+
+ ptest.MainRet(m)
+}