This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89cd2383859 [prism] Regs for filter and stats transform tests. (#27586)
89cd2383859 is described below
commit 89cd238385923b033f564c058e00b4a8c9bd0d81
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jul 21 06:59:02 2023 -0700
[prism] Regs for filter and stats transform tests. (#27586)
Co-authored-by: lostluck <[email protected]>
---
sdks/go/pkg/beam/transforms/filter/filter.go | 10 +-
sdks/go/pkg/beam/transforms/filter/filter.shims.go | 201 ---------------------
sdks/go/pkg/beam/transforms/filter/filter_test.go | 17 ++
sdks/go/pkg/beam/transforms/stats/count_test.go | 9 +
sdks/go/pkg/beam/transforms/stats/max_test.go | 6 +
sdks/go/pkg/beam/transforms/stats/quantiles.go | 8 +-
.../go/pkg/beam/transforms/stats/quantiles_test.go | 51 ++----
7 files changed, 58 insertions(+), 244 deletions(-)
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go
b/sdks/go/pkg/beam/transforms/filter/filter.go
index 913e7355c30..997eec5eb4e 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter.go
@@ -21,11 +21,15 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)
-//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen
-//go:generate starcgen --package=filter --identifiers=filterFn,mapFn,mergeFn
-//go:generate go fmt
+func init() {
+ register.DoFn2x0[beam.T, func(beam.T)]((*filterFn)(nil))
+ register.Function1x2(mapFn)
+ register.Function2x1(mergeFn)
+ register.Emitter1[beam.T]()
+}
var (
sig = funcx.MakePredicate(beam.TType) // T -> bool
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.shims.go
b/sdks/go/pkg/beam/transforms/filter/filter.shims.go
deleted file mode 100644
index b0d18233ab1..00000000000
--- a/sdks/go/pkg/beam/transforms/filter/filter.shims.go
+++ /dev/null
@@ -1,201 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Code generated by starcgen. DO NOT EDIT.
-// File: filter.shims.go
-
-package filter
-
-import (
- "context"
- "reflect"
-
- // Library imports
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
-)
-
-func init() {
- runtime.RegisterFunction(mapFn)
- runtime.RegisterFunction(mergeFn)
- runtime.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem())
- schema.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem())
- reflectx.RegisterStructWrapper(reflect.TypeOf((*filterFn)(nil)).Elem(),
wrapMakerFilterFn)
- reflectx.RegisterFunc(reflect.TypeOf((*func(int, int)
int)(nil)).Elem(), funcMakerIntIntГInt)
- reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T,
func(typex.T)))(nil)).Elem(), funcMakerTypex۰TEmitTypex۰TГ)
- reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T) (typex.T,
int))(nil)).Elem(), funcMakerTypex۰TГTypex۰TInt)
- reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), funcMakerГ)
- exec.RegisterEmitter(reflect.TypeOf((*func(typex.T))(nil)).Elem(),
emitMakerTypex۰T)
-}
-
-func wrapMakerFilterFn(fn any) map[string]reflectx.Func {
- dfn := fn.(*filterFn)
- return map[string]reflectx.Func{
- "ProcessElement": reflectx.MakeFunc(func(a0 typex.T, a1
func(typex.T)) { dfn.ProcessElement(a0, a1) }),
- "Setup": reflectx.MakeFunc(func() { dfn.Setup() }),
- }
-}
-
-type callerIntIntГInt struct {
- fn func(int, int) int
-}
-
-func funcMakerIntIntГInt(fn any) reflectx.Func {
- f := fn.(func(int, int) int)
- return &callerIntIntГInt{fn: f}
-}
-
-func (c *callerIntIntГInt) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerIntIntГInt) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerIntIntГInt) Call(args []any) []any {
- out0 := c.fn(args[0].(int), args[1].(int))
- return []any{out0}
-}
-
-func (c *callerIntIntГInt) Call2x1(arg0, arg1 any) any {
- return c.fn(arg0.(int), arg1.(int))
-}
-
-type callerTypex۰TEmitTypex۰TГ struct {
- fn func(typex.T, func(typex.T))
-}
-
-func funcMakerTypex۰TEmitTypex۰TГ(fn any) reflectx.Func {
- f := fn.(func(typex.T, func(typex.T)))
- return &callerTypex۰TEmitTypex۰TГ{fn: f}
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Call(args []any) []any {
- c.fn(args[0].(typex.T), args[1].(func(typex.T)))
- return []any{}
-}
-
-func (c *callerTypex۰TEmitTypex۰TГ) Call2x0(arg0, arg1 any) {
- c.fn(arg0.(typex.T), arg1.(func(typex.T)))
-}
-
-type callerTypex۰TГTypex۰TInt struct {
- fn func(typex.T) (typex.T, int)
-}
-
-func funcMakerTypex۰TГTypex۰TInt(fn any) reflectx.Func {
- f := fn.(func(typex.T) (typex.T, int))
- return &callerTypex۰TГTypex۰TInt{fn: f}
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Call(args []any) []any {
- out0, out1 := c.fn(args[0].(typex.T))
- return []any{out0, out1}
-}
-
-func (c *callerTypex۰TГTypex۰TInt) Call1x2(arg0 any) (any, any) {
- return c.fn(arg0.(typex.T))
-}
-
-type callerГ struct {
- fn func()
-}
-
-func funcMakerГ(fn any) reflectx.Func {
- f := fn.(func())
- return &callerГ{fn: f}
-}
-
-func (c *callerГ) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerГ) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerГ) Call(args []any) []any {
- c.fn()
- return []any{}
-}
-
-func (c *callerГ) Call0x0() {
- c.fn()
-}
-
-type emitNative struct {
- n exec.ElementProcessor
- fn any
- est *sdf.WatermarkEstimator
-
- ctx context.Context
- ws []typex.Window
- et typex.EventTime
- value exec.FullValue
-}
-
-func (e *emitNative) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
- e.ctx = ctx
- e.ws = ws
- e.et = et
- return nil
-}
-
-func (e *emitNative) Value() any {
- return e.fn
-}
-
-func (e *emitNative) AttachEstimator(est *sdf.WatermarkEstimator) {
- e.est = est
-}
-
-func emitMakerTypex۰T(n exec.ElementProcessor) exec.ReusableEmitter {
- ret := &emitNative{n: n}
- ret.fn = ret.invokeTypex۰T
- return ret
-}
-
-func (e *emitNative) invokeTypex۰T(val typex.T) {
- e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
- if e.est != nil {
-
(*e.est).(sdf.TimestampObservingEstimator).ObserveTimestamp(e.et.ToTime())
- }
- if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
- panic(err)
- }
-}
-
-// DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go
b/sdks/go/pkg/beam/transforms/filter/filter_test.go
index 9cc5a526af9..96b4cbe12d7 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter_test.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go
@@ -18,11 +18,28 @@ package filter_test
import (
"testing"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
)
+func TestMain(m *testing.M) {
+ ptest.Main(m)
+}
+
+func init() {
+ register.Function1x1(alwaysTrue)
+ register.Function1x1(alwaysFalse)
+ register.Function1x1(isOne)
+ register.Function1x1(greaterThanOne)
+}
+
+func alwaysTrue(a int) bool { return true }
+func alwaysFalse(a int) bool { return false }
+func isOne(a int) bool { return a == 1 }
+func greaterThanOne(a int) bool { return a > 1 }
+
func TestInclude(t *testing.T) {
tests := []struct {
in []int
diff --git a/sdks/go/pkg/beam/transforms/stats/count_test.go
b/sdks/go/pkg/beam/transforms/stats/count_test.go
index 23627a92f79..be6ce950e20 100644
--- a/sdks/go/pkg/beam/transforms/stats/count_test.go
+++ b/sdks/go/pkg/beam/transforms/stats/count_test.go
@@ -20,10 +20,19 @@ import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
+func TestMain(m *testing.M) {
+ ptest.Main(m)
+}
+
+func init() {
+ register.Function2x1(kvToCount)
+}
+
type count struct {
Elm int
Count int
diff --git a/sdks/go/pkg/beam/transforms/stats/max_test.go
b/sdks/go/pkg/beam/transforms/stats/max_test.go
index af817527dc9..531792e70f5 100644
--- a/sdks/go/pkg/beam/transforms/stats/max_test.go
+++ b/sdks/go/pkg/beam/transforms/stats/max_test.go
@@ -19,10 +19,16 @@ import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
+func init() {
+ register.Function2x1(kvToStudent)
+ register.Function1x2(studentToKV)
+}
+
type student struct {
Name string
Grade float64
diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go
b/sdks/go/pkg/beam/transforms/stats/quantiles.go
index 79a66b58e1f..6d2baa8b5e9 100644
--- a/sdks/go/pkg/beam/transforms/stats/quantiles.go
+++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)
func init() {
@@ -44,6 +45,9 @@ func init() {
beam.RegisterType(reflect.TypeOf((*shardElementsFn)(nil)).Elem())
beam.RegisterCoder(compactorsType, encodeCompactors, decodeCompactors)
beam.RegisterCoder(weightedElementType, encodeWeightedElement,
decodeWeightedElement)
+
+ register.Function1x2(fixedKey)
+ register.Function2x1(makeWeightedElement)
}
// Opts contains settings used to configure how approximate quantiles are
computed.
@@ -663,12 +667,14 @@ func makeWeightedElement(weight int, element beam.T)
weightedElement {
return weightedElement{weight: weight, element: element}
}
+func fixedKey(e beam.T) (int, beam.T) { return 1, e }
+
// ApproximateQuantiles computes approximate quantiles for the input
PCollection<T>.
//
// The output PCollection contains a single element: a list of numQuantiles -
1 elements approximately splitting up the input collection into numQuantiles
separate quantiles.
// For example, if numQuantiles = 2, the returned list would contain a single
element such that approximately half of the input would be less than that
element and half would be greater.
func ApproximateQuantiles(s beam.Scope, pc beam.PCollection, less any, opts
Opts) beam.PCollection {
- return ApproximateWeightedQuantiles(s, beam.ParDo(s, func(e beam.T)
(int, beam.T) { return 1, e }, pc), less, opts)
+ return ApproximateWeightedQuantiles(s, beam.ParDo(s, fixedKey, pc),
less, opts)
}
// reduce takes a PCollection<weightedElementWrapper> and returns a
PCollection<*compactors>. The output PCollection may have at most
shardSizes[len(shardSizes) - 1] compactors.
diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
index c03620d0b9b..1e389eed128 100644
--- a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
+++ b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go
@@ -16,46 +16,19 @@
package stats
import (
- "reflect"
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/google/go-cmp/cmp"
)
func init() {
- beam.RegisterFunction(weightedElementToKv)
-
- // In practice, this runs faster than plain reflection.
- // TODO(https://github.com/apache/beam/issues/20271): Remove once
collisions don't occur for starcgen over test code and an equivalent is
generated for us.
- reflectx.RegisterFunc(reflect.ValueOf(less).Type(), func(_ any)
reflectx.Func {
- return newIntLess()
- })
-}
-
-type intLess struct {
- name string
- t reflect.Type
-}
-
-func newIntLess() *intLess {
- return &intLess{
- name: reflectx.FunctionName(reflect.ValueOf(less).Interface()),
- t: reflect.ValueOf(less).Type(),
- }
-}
-
-func (i *intLess) Name() string {
- return i.name
-}
-func (i *intLess) Type() reflect.Type {
- return i.t
-}
-func (i *intLess) Call(args []any) []any {
- return []any{args[0].(int) < args[1].(int)}
+ register.Function1x2(weightedElementToKv)
+ register.Function2x1(less)
}
func less(a, b int) bool {
@@ -68,7 +41,7 @@ func TestLargeQuantiles(t *testing.T) {
for i := 0; i < numElements; i++ {
inputSlice = append(inputSlice, i)
}
- p, s, input, expected := ptest.CreateList2(inputSlice,
[][]int{[]int{10006, 19973}})
+ p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{10006,
19973}})
quantiles := ApproximateQuantiles(s, input, less, Opts{
K: 200,
NumQuantiles: 3,
@@ -85,7 +58,7 @@ func TestLargeQuantilesReversed(t *testing.T) {
for i := numElements - 1; i >= 0; i-- {
inputSlice = append(inputSlice, i)
}
- p, s, input, expected := ptest.CreateList2(inputSlice,
[][]int{[]int{9985, 19959}})
+ p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{9985,
19959}})
quantiles := ApproximateQuantiles(s, input, less, Opts{
K: 200,
NumQuantiles: 3,
@@ -103,8 +76,8 @@ func TestBasicQuantiles(t *testing.T) {
Expected [][]int
}{
{[]int{}, [][]int{}},
- {[]int{1}, [][]int{[]int{1}}},
- {[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20}, [][]int{[]int{6, 13}}},
+ {[]int{1}, [][]int{{1}}},
+ {[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20}, [][]int{{6, 13}}},
}
for _, test := range tests {
@@ -180,7 +153,7 @@ func TestMerging(t *testing.T) {
K: 3,
NumberOfCompactions: 1,
Compactors: []compactor{{
- sorted: [][]beam.T{[]beam.T{1}, []beam.T{2},
[]beam.T{3}},
+ sorted: [][]beam.T{{1}, {2}, {3}},
unsorted: []beam.T{6, 5, 4},
capacity: 4,
}},
@@ -191,7 +164,7 @@ func TestMerging(t *testing.T) {
NumberOfCompactions: 1,
Compactors: []compactor{
{
- sorted: [][]beam.T{[]beam.T{7}, []beam.T{8},
[]beam.T{9}},
+ sorted: [][]beam.T{{7}, {8}, {9}},
unsorted: []beam.T{12, 11, 10},
capacity: 4},
},
@@ -205,7 +178,7 @@ func TestMerging(t *testing.T) {
Compactors: []compactor{
{capacity: 4},
{
- sorted: [][]beam.T{[]beam.T{1, 3, 5, 7, 9,
11}},
+ sorted: [][]beam.T{{1, 3, 5, 7, 9, 11}},
capacity: 4,
},
},
@@ -222,12 +195,12 @@ func TestCompactorsEncoding(t *testing.T) {
Compactors: []compactor{
{
capacity: 4,
- sorted: [][]beam.T{[]beam.T{1, 2}},
+ sorted: [][]beam.T{{1, 2}},
unsorted: []beam.T{3, 4},
},
{
capacity: 4,
- sorted: [][]beam.T{[]beam.T{5, 6}},
+ sorted: [][]beam.T{{5, 6}},
unsorted: []beam.T{7, 8},
},
},