This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89cd2383859 [prism] Regs for filter and stats transform tests. (#27586)
89cd2383859 is described below

commit 89cd238385923b033f564c058e00b4a8c9bd0d81
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jul 21 06:59:02 2023 -0700

    [prism] Regs for filter and stats transform tests. (#27586)
    
    Co-authored-by: lostluck <[email protected]>
---
 sdks/go/pkg/beam/transforms/filter/filter.go       |  10 +-
 sdks/go/pkg/beam/transforms/filter/filter.shims.go | 201 ---------------------
 sdks/go/pkg/beam/transforms/filter/filter_test.go  |  17 ++
 sdks/go/pkg/beam/transforms/stats/count_test.go    |   9 +
 sdks/go/pkg/beam/transforms/stats/max_test.go      |   6 +
 sdks/go/pkg/beam/transforms/stats/quantiles.go     |   8 +-
 .../go/pkg/beam/transforms/stats/quantiles_test.go |  51 ++----
 7 files changed, 58 insertions(+), 244 deletions(-)

diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go 
b/sdks/go/pkg/beam/transforms/filter/filter.go
index 913e7355c30..997eec5eb4e 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter.go
@@ -21,11 +21,15 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 )
 
-//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen
-//go:generate starcgen --package=filter --identifiers=filterFn,mapFn,mergeFn
-//go:generate go fmt
+func init() {
+       register.DoFn2x0[beam.T, func(beam.T)]((*filterFn)(nil))
+       register.Function1x2(mapFn)
+       register.Function2x1(mergeFn)
+       register.Emitter1[beam.T]()
+}
 
 var (
        sig = funcx.MakePredicate(beam.TType) // T -> bool
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.shims.go 
b/sdks/go/pkg/beam/transforms/filter/filter.shims.go
deleted file mode 100644
index b0d18233ab1..00000000000
--- a/sdks/go/pkg/beam/transforms/filter/filter.shims.go
+++ /dev/null
@@ -1,201 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Code generated by starcgen. DO NOT EDIT.
-// File: filter.shims.go
-
-package filter
-
-import (
-       "context"
-       "reflect"
-
-       // Library imports
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
-)
-
-func init() {
-       runtime.RegisterFunction(mapFn)
-       runtime.RegisterFunction(mergeFn)
-       runtime.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem())
-       schema.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem())
-       reflectx.RegisterStructWrapper(reflect.TypeOf((*filterFn)(nil)).Elem(), 
wrapMakerFilterFn)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(int, int) 
int)(nil)).Elem(), funcMakerIntIntГInt)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T, 
func(typex.T)))(nil)).Elem(), funcMakerTypex۰TEmitTypex۰TГ)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T) (typex.T, 
int))(nil)).Elem(), funcMakerTypex۰TГTypex۰TInt)
-       reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), funcMakerГ)
-       exec.RegisterEmitter(reflect.TypeOf((*func(typex.T))(nil)).Elem(), 
emitMakerTypex۰T)
-}
-
-func wrapMakerFilterFn(fn any) map[string]reflectx.Func {
-       dfn := fn.(*filterFn)
-       return map[string]reflectx.Func{
-               "ProcessElement": reflectx.MakeFunc(func(a0 typex.T, a1 
func(typex.T)) { dfn.ProcessElement(a0, a1) }),
-               "Setup":          reflectx.MakeFunc(func() { dfn.Setup() }),
-       }
-}
-
-type callerIntIntГInt struct {
-       fn func(int, int) int
-}
-
-func funcMakerIntIntГInt(fn any) reflectx.Func {
-       f := fn.(func(int, int) int)
-       return &callerIntIntГInt{fn: f}
-}
-
-func (c *callerIntIntГInt) Name() string {
-       return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerIntIntГInt) Type() reflect.Type {
-       return reflect.TypeOf(c.fn)
-}
-
-func (c *callerIntIntГInt) Call(args []any) []any {
-       out0 := c.fn(args[0].(int), args[1].(int))
-       return []any{out0}
-}
-
-func (c *callerIntIntГInt) Call2x1(arg0, arg1 any) any {
-       return c.fn(arg0.(int), arg1.(int))
-}
-
-type callerTypex۰TEmitTypex۰TГ struct {
-       fn func(typex.T, func(typex.T))
-}
-
-func funcMakerTypex۰TEmitTypex۰TГ(fn any) reflectx.Func {
-       f := fn.(func(typex.T, func(typex.T)))
-       return &callerTypex۰TEmitTypex۰TГ{fn: f}
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Name() string {
-       return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Type() reflect.Type {
-       return reflect.TypeOf(c.fn)
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Call(args []any) []any {
-       c.fn(args[0].(typex.T), args[1].(func(typex.T)))
-       return []any{}
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Call2x0(arg0, arg1 any) {
-       c.fn(arg0.(typex.T), arg1.(func(typex.T)))
-}
-
-type callerTypex۰TГTypex۰TInt struct {
-       fn func(typex.T) (typex.T, int)
-}
-
-func funcMakerTypex۰TГTypex۰TInt(fn any) reflectx.Func {
-       f := fn.(func(typex.T) (typex.T, int))
-       return &callerTypex۰TГTypex۰TInt{fn: f}
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Name() string {
-       return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Type() reflect.Type {
-       return reflect.TypeOf(c.fn)
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Call(args []any) []any {
-       out0, out1 := c.fn(args[0].(typex.T))
-       return []any{out0, out1}
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Call1x2(arg0 any) (any, any) {
-       return c.fn(arg0.(typex.T))
-}
-
-type callerГ struct {
-       fn func()
-}
-
-func funcMakerГ(fn any) reflectx.Func {
-       f := fn.(func())
-       return &callerГ{fn: f}
-}
-
-func (c *callerГ) Name() string {
-       return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerГ) Type() reflect.Type {
-       return reflect.TypeOf(c.fn)
-}
-
-func (c *callerГ) Call(args []any) []any {
-       c.fn()
-       return []any{}
-}
-
-func (c *callerГ) Call0x0() {
-       c.fn()
-}
-
-type emitNative struct {
-       n   exec.ElementProcessor
-       fn  any
-       est *sdf.WatermarkEstimator
-
-       ctx   context.Context
-       ws    []typex.Window
-       et    typex.EventTime
-       value exec.FullValue
-}
-
-func (e *emitNative) Init(ctx context.Context, ws []typex.Window, et 
typex.EventTime) error {
-       e.ctx = ctx
-       e.ws = ws
-       e.et = et
-       return nil
-}
-
-func (e *emitNative) Value() any {
-       return e.fn
-}
-
-func (e *emitNative) AttachEstimator(est *sdf.WatermarkEstimator) {
-       e.est = est
-}
-
-func emitMakerTypex۰T(n exec.ElementProcessor) exec.ReusableEmitter {
-       ret := &emitNative{n: n}
-       ret.fn = ret.invokeTypex۰T
-       return ret
-}
-
-func (e *emitNative) invokeTypex۰T(val typex.T) {
-       e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
-       if e.est != nil {
-               
(*e.est).(sdf.TimestampObservingEstimator).ObserveTimestamp(e.et.ToTime())
-       }
-       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
-               panic(err)
-       }
-}
-
-// DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go 
b/sdks/go/pkg/beam/transforms/filter/filter_test.go
index 9cc5a526af9..96b4cbe12d7 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter_test.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go
@@ -18,11 +18,28 @@ package filter_test
 import (
        "testing"
 
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
 )
 
+func TestMain(m *testing.M) {
+       ptest.Main(m)
+}
+
+func init() {
+       register.Function1x1(alwaysTrue)
+       register.Function1x1(alwaysFalse)
+       register.Function1x1(isOne)
+       register.Function1x1(greaterThanOne)
+}
+
+func alwaysTrue(a int) bool     { return true }
+func alwaysFalse(a int) bool    { return false }
+func isOne(a int) bool          { return a == 1 }
+func greaterThanOne(a int) bool { return a > 1 }
+
 func TestInclude(t *testing.T) {
        tests := []struct {
                in  []int
diff --git a/sdks/go/pkg/beam/transforms/stats/count_test.go 
b/sdks/go/pkg/beam/transforms/stats/count_test.go
index 23627a92f79..be6ce950e20 100644
--- a/sdks/go/pkg/beam/transforms/stats/count_test.go
+++ b/sdks/go/pkg/beam/transforms/stats/count_test.go
@@ -20,10 +20,19 @@ import (
        "testing"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
 )
 
+func TestMain(m *testing.M) {
+       ptest.Main(m)
+}
+
+func init() {
+       register.Function2x1(kvToCount)
+}
+
 type count struct {
        Elm   int
        Count int
diff --git a/sdks/go/pkg/beam/transforms/stats/max_test.go 
b/sdks/go/pkg/beam/transforms/stats/max_test.go
index af817527dc9..531792e70f5 100644
--- a/sdks/go/pkg/beam/transforms/stats/max_test.go
+++ b/sdks/go/pkg/beam/transforms/stats/max_test.go
@@ -19,10 +19,16 @@ import (
        "testing"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
 )
 
+func init() {
+       register.Function2x1(kvToStudent)
+       register.Function1x2(studentToKV)
+}
+
 type student struct {
        Name  string
        Grade float64
diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go 
b/sdks/go/pkg/beam/transforms/stats/quantiles.go
index 79a66b58e1f..6d2baa8b5e9 100644
--- a/sdks/go/pkg/beam/transforms/stats/quantiles.go
+++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go
@@ -31,6 +31,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 )
 
 func init() {
@@ -44,6 +45,9 @@ func init() {
        beam.RegisterType(reflect.TypeOf((*shardElementsFn)(nil)).Elem())
        beam.RegisterCoder(compactorsType, encodeCompactors, decodeCompactors)
        beam.RegisterCoder(weightedElementType, encodeWeightedElement, 
decodeWeightedElement)
+
+       register.Function1x2(fixedKey)
+       register.Function2x1(makeWeightedElement)
 }
 
 // Opts contains settings used to configure how approximate quantiles are 
computed.
@@ -663,12 +667,14 @@ func makeWeightedElement(weight int, element beam.T) 
weightedElement {
        return weightedElement{weight: weight, element: element}
 }
 
+func fixedKey(e beam.T) (int, beam.T) { return 1, e }
+
 // ApproximateQuantiles computes approximate quantiles for the input 
PCollection<T>.
 //
 // The output PCollection contains a single element: a list of numQuantiles - 
1 elements approximately splitting up the input collection into numQuantiles 
separate quantiles.
 // For example, if numQuantiles = 2, the returned list would contain a single 
element such that approximately half of the input would be less than that 
element and half would be greater.
 func ApproximateQuantiles(s beam.Scope, pc beam.PCollection, less any, opts 
Opts) beam.PCollection {
-       return ApproximateWeightedQuantiles(s, beam.ParDo(s, func(e beam.T) 
(int, beam.T) { return 1, e }, pc), less, opts)
+       return ApproximateWeightedQuantiles(s, beam.ParDo(s, fixedKey, pc), 
less, opts)
 }
 
 // reduce takes a PCollection<weightedElementWrapper> and returns a 
PCollection<*compactors>. The output PCollection may have at most 
shardSizes[len(shardSizes) - 1] compactors.
diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go 
b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
index c03620d0b9b..1e389eed128 100644
--- a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
+++ b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
@@ -16,46 +16,19 @@
 package stats
 
 import (
-       "reflect"
        "testing"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
        "github.com/google/go-cmp/cmp"
 )
 
 func init() {
-       beam.RegisterFunction(weightedElementToKv)
-
-       // In practice, this runs faster than plain reflection.
-       // TODO(https://github.com/apache/beam/issues/20271): Remove once 
collisions don't occur for starcgen over test code and an equivalent is 
generated for us.
-       reflectx.RegisterFunc(reflect.ValueOf(less).Type(), func(_ any) 
reflectx.Func {
-               return newIntLess()
-       })
-}
-
-type intLess struct {
-       name string
-       t    reflect.Type
-}
-
-func newIntLess() *intLess {
-       return &intLess{
-               name: reflectx.FunctionName(reflect.ValueOf(less).Interface()),
-               t:    reflect.ValueOf(less).Type(),
-       }
-}
-
-func (i *intLess) Name() string {
-       return i.name
-}
-func (i *intLess) Type() reflect.Type {
-       return i.t
-}
-func (i *intLess) Call(args []any) []any {
-       return []any{args[0].(int) < args[1].(int)}
+       register.Function1x2(weightedElementToKv)
+       register.Function2x1(less)
 }
 
 func less(a, b int) bool {
@@ -68,7 +41,7 @@ func TestLargeQuantiles(t *testing.T) {
        for i := 0; i < numElements; i++ {
                inputSlice = append(inputSlice, i)
        }
-       p, s, input, expected := ptest.CreateList2(inputSlice, 
[][]int{[]int{10006, 19973}})
+       p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{10006, 
19973}})
        quantiles := ApproximateQuantiles(s, input, less, Opts{
                K:            200,
                NumQuantiles: 3,
@@ -85,7 +58,7 @@ func TestLargeQuantilesReversed(t *testing.T) {
        for i := numElements - 1; i >= 0; i-- {
                inputSlice = append(inputSlice, i)
        }
-       p, s, input, expected := ptest.CreateList2(inputSlice, 
[][]int{[]int{9985, 19959}})
+       p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{9985, 
19959}})
        quantiles := ApproximateQuantiles(s, input, less, Opts{
                K:            200,
                NumQuantiles: 3,
@@ -103,8 +76,8 @@ func TestBasicQuantiles(t *testing.T) {
                Expected [][]int
        }{
                {[]int{}, [][]int{}},
-               {[]int{1}, [][]int{[]int{1}}},
-               {[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19, 20}, [][]int{[]int{6, 13}}},
+               {[]int{1}, [][]int{{1}}},
+               {[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19, 20}, [][]int{{6, 13}}},
        }
 
        for _, test := range tests {
@@ -180,7 +153,7 @@ func TestMerging(t *testing.T) {
                K:                   3,
                NumberOfCompactions: 1,
                Compactors: []compactor{{
-                       sorted:   [][]beam.T{[]beam.T{1}, []beam.T{2}, 
[]beam.T{3}},
+                       sorted:   [][]beam.T{{1}, {2}, {3}},
                        unsorted: []beam.T{6, 5, 4},
                        capacity: 4,
                }},
@@ -191,7 +164,7 @@ func TestMerging(t *testing.T) {
                NumberOfCompactions: 1,
                Compactors: []compactor{
                        {
-                               sorted:   [][]beam.T{[]beam.T{7}, []beam.T{8}, 
[]beam.T{9}},
+                               sorted:   [][]beam.T{{7}, {8}, {9}},
                                unsorted: []beam.T{12, 11, 10},
                                capacity: 4},
                },
@@ -205,7 +178,7 @@ func TestMerging(t *testing.T) {
                Compactors: []compactor{
                        {capacity: 4},
                        {
-                               sorted:   [][]beam.T{[]beam.T{1, 3, 5, 7, 9, 
11}},
+                               sorted:   [][]beam.T{{1, 3, 5, 7, 9, 11}},
                                capacity: 4,
                        },
                },
@@ -222,12 +195,12 @@ func TestCompactorsEncoding(t *testing.T) {
                Compactors: []compactor{
                        {
                                capacity: 4,
-                               sorted:   [][]beam.T{[]beam.T{1, 2}},
+                               sorted:   [][]beam.T{{1, 2}},
                                unsorted: []beam.T{3, 4},
                        },
                        {
                                capacity: 4,
-                               sorted:   [][]beam.T{[]beam.T{5, 6}},
+                               sorted:   [][]beam.T{{5, 6}},
                                unsorted: []beam.T{7, 8},
                        },
                },

Reply via email to