lostluck commented on code in PR #17574: URL: https://github.com/apache/beam/pull/17574#discussion_r867202050
########## sdks/go/pkg/beam/registration/emitterIterRegistration.go: ########## @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registration + +import ( + "context" + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type emitNative1[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1[T]) Value() interface{} { + return e.fn Review Comment: So the original generated code used "anchor" types of emitNative and iterNative, and added additional methods for whatever types were generated. Then the method appropriate to the factory function was assigned to the internal function pointer, to minimize the generated per type boilerplate somewhat. You can see this in the ["optimized" package](https://github.com/apache/beam/blob/51067c123204a4951de3cba858edc96da354292b/sdks/go/pkg/beam/core/runtime/exec/optimized/emitters.go#L1072). That package is quite legacy and handles the built ins, but ultimately would only train users to assume the built ins are fast, and nothing else. All this work allows fast for everyone, and lets us train users for it properly. We don't need to do that approach anymore with generics. Instead return `e.invoke` directly, instead of assigning it to an intermediate field. Then we can also get rid of the `fn` field. Here and throughout It shouldn't be worse either, since `interface{}` values would be copied by value anyway. ########## sdks/go/pkg/beam/registration/emitterIterRegistration.go: ########## @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more Review Comment: file name nit: Feel free to split this into 2 files, emitters.go and iterators.go Don't repeat the package name in the file itself. ########## sdks/go/pkg/beam/registration/emitterIterRegistration.go: ########## @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registration + +import ( + "context" + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type emitNative1[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1[T]) invoke(val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2[T1, T2]) invoke(key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative1WithTimestamp[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1WithTimestamp[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2WithTimestamp[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +// RegisterEmitter1 registers parameters from your DoFn with a +// signature func(T) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter1[T]((*func(T))(nil)) +func RegisterEmitter1[T1 any](e *func(T1)) { Review Comment: Please drop the Register prefix for all the functions here. As per your example users will already have `registration` as a prefix. `registration.Emitter1` is more readable. Hmmm, it's also not too late to just call it the `register` package too... but that's definitely a change for another PR. I suspect this is hangover from when it was in the `beam` package. ########## sdks/go/pkg/beam/registration/emitterIterRegistration.go: ########## @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registration + +import ( + "context" + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type emitNative1[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1[T]) invoke(val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2[T1, T2]) invoke(key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative1WithTimestamp[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1WithTimestamp[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2WithTimestamp[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +// RegisterEmitter1 registers parameters from your DoFn with a +// signature func(T) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter1[T]((*func(T))(nil)) +func RegisterEmitter1[T1 any](e *func(T1)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative1[T1]{n: n} + gen.fn = gen.invoke + return gen + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +// RegisterEmitter2 registers parameters from your DoFn with a +// signature func(T1, T2) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil)) +func RegisterEmitter2[T1, T2 any](e *func(T1, T2)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative2[T1, T2]{n: n} + gen.fn = gen.invoke + return gen + } + if reflect.TypeOf(e).Elem().In(0) == typex.EventTimeType { + registerFunc = func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative1WithTimestamp[T2]{n: n} + gen.fn = gen.invoke + return gen + } + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +// RegisterEmitter3 registers parameters from your DoFn with a +// signature func(T1, T2, T3) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter3[T1, T2, T3]((*func(T1, T2, T3))(nil)) +func RegisterEmitter3[T1, T2, T3 any](e *func(T1, T2, T3)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative2WithTimestamp[T2, T3]{n: n} + gen.fn = gen.invoke + return gen + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +type iterNative1[T any] struct { + s exec.ReStream + fn interface{} + + // cur is the "current" stream, if any. + cur exec.Stream +} + +func (v *iterNative1[T]) Init() error { + cur, err := v.s.Open() + if err != nil { + return err + } + v.cur = cur + return nil +} + +func (v *iterNative1[T]) Value() interface{} { + return v.fn +} + +func (v *iterNative1[T]) Reset() error { + if err := v.cur.Close(); err != nil { + return err + } + v.cur = nil + return nil +} + +func (v *iterNative1[T]) invoke(value *T) bool { + elm, err := v.cur.Read() + if err != nil { + if err == io.EOF { + return false + } + panic(fmt.Sprintf("broken stream: %v", err)) + } + *value = elm.Elm.(T) + return true +} + +type iterNative2[T1, T2 any] struct { + s exec.ReStream + fn interface{} + + // cur is the "current" stream, if any. + cur exec.Stream +} + +func (v *iterNative2[T1, T2]) Init() error { + cur, err := v.s.Open() + if err != nil { + return err + } + v.cur = cur + return nil +} + +func (v *iterNative2[T1, T2]) Value() interface{} { + return v.fn +} + +func (v *iterNative2[T1, T2]) Reset() error { + if err := v.cur.Close(); err != nil { + return err + } + v.cur = nil + return nil +} + +func (v *iterNative2[T1, T2]) invoke(key *T1, value *T2) bool { + elm, err := v.cur.Read() + if err != nil { + if err == io.EOF { + return false + } + panic(fmt.Sprintf("broken stream: %v", err)) + } + *key = elm.Elm.(T1) + *value = elm.Elm2.(T2) + return true +} + +// RegisterIter1 registers parameters from your DoFn with a +// signature func(*T) bool and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterIter1[T]((*func(*T) bool)(nil)) +func RegisterIter1[T any](i *func(*T) bool) { Review Comment: We may consider adding something for Map Side Input functions `func(K) func(*V) bool` here and "re iterators" `func() func(*V) bool` (and the KV version). That's not critical for this pass, since we don't have the registration scaffolding for that set up, and I don't think there's a great need for it, because those typically have high RPC overhead anyway as they fetch & refetch the data. We can always benchmark and add them later if needed. The reflective versions are here: https://github.com/apache/beam/blob/51067c123204a4951de3cba858edc96da354292b/sdks/go/pkg/beam/core/runtime/exec/input.go#L71 ########## sdks/go/pkg/beam/registration/emitterIterRegistration.go: ########## @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registration + +import ( + "context" + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type emitNative1[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1[T]) invoke(val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2[T1, T2]) invoke(key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative1WithTimestamp[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1WithTimestamp[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1WithTimestamp[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1WithTimestamp[T]) invoke(et typex.EventTime, val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2WithTimestamp[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2WithTimestamp[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2WithTimestamp[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2WithTimestamp[T1, T2]) invoke(et typex.EventTime, key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +// RegisterEmitter1 registers parameters from your DoFn with a +// signature func(T) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter1[T]((*func(T))(nil)) +func RegisterEmitter1[T1 any](e *func(T1)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative1[T1]{n: n} + gen.fn = gen.invoke + return gen + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +// RegisterEmitter2 registers parameters from your DoFn with a +// signature func(T1, T2) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil)) +func RegisterEmitter2[T1, T2 any](e *func(T1, T2)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative2[T1, T2]{n: n} + gen.fn = gen.invoke + return gen + } + if reflect.TypeOf(e).Elem().In(0) == typex.EventTimeType { + registerFunc = func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative1WithTimestamp[T2]{n: n} + gen.fn = gen.invoke + return gen + } + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +// RegisterEmitter3 registers parameters from your DoFn with a +// signature func(T1, T2, T3) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter3[T1, T2, T3]((*func(T1, T2, T3))(nil)) +func RegisterEmitter3[T1, T2, T3 any](e *func(T1, T2, T3)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative2WithTimestamp[T2, T3]{n: n} + gen.fn = gen.invoke + return gen + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +type iterNative1[T any] struct { + s exec.ReStream + fn interface{} + + // cur is the "current" stream, if any. + cur exec.Stream +} + +func (v *iterNative1[T]) Init() error { + cur, err := v.s.Open() + if err != nil { + return err + } + v.cur = cur + return nil +} + +func (v *iterNative1[T]) Value() interface{} { + return v.fn +} + +func (v *iterNative1[T]) Reset() error { + if err := v.cur.Close(); err != nil { + return err + } + v.cur = nil + return nil +} + +func (v *iterNative1[T]) invoke(value *T) bool { + elm, err := v.cur.Read() + if err != nil { + if err == io.EOF { + return false + } + panic(fmt.Sprintf("broken stream: %v", err)) + } + *value = elm.Elm.(T) + return true +} + +type iterNative2[T1, T2 any] struct { + s exec.ReStream + fn interface{} + + // cur is the "current" stream, if any. + cur exec.Stream +} + +func (v *iterNative2[T1, T2]) Init() error { + cur, err := v.s.Open() + if err != nil { + return err + } + v.cur = cur + return nil +} + +func (v *iterNative2[T1, T2]) Value() interface{} { + return v.fn +} + +func (v *iterNative2[T1, T2]) Reset() error { + if err := v.cur.Close(); err != nil { + return err + } + v.cur = nil + return nil +} + +func (v *iterNative2[T1, T2]) invoke(key *T1, value *T2) bool { + elm, err := v.cur.Read() + if err != nil { + if err == io.EOF { + return false + } + panic(fmt.Sprintf("broken stream: %v", err)) + } + *key = elm.Elm.(T1) + *value = elm.Elm2.(T2) + return true +} + +// RegisterIter1 registers parameters from your DoFn with a +// signature func(*T) bool and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterIter1[T]((*func(*T) bool)(nil)) +func RegisterIter1[T any](i *func(*T) bool) { + registerFunc := func(s exec.ReStream) exec.ReusableInput { + ret := &iterNative1[T]{s: s} + ret.fn = ret.invoke + return ret + } + exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc) +} + +// RegisterIter1 registers parameters from your DoFn with a +// signature func(*T1, *T2) bool and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterIter2[T1, T2]((*func(*T1, *T2) bool)(nil)) Review Comment: What do you think about dropping the function type parameter? 1. Users aren't going to have an iterator/emitter around, so they'll need to specify func pointer set up like you've got. 2. Even assuming type inference worked, it's still annoying to go through the awkward (*blah)(nil) dance. https://go.dev/play/p/quG3c5scpGQ Inference does seem to work at least. We do get compile time safety with this approach too, but it also prevents us from automatically supporting any "new" iterator or emitter formats in the future, with the same registration calls. What do you think? ########## sdks/go/pkg/beam/registration/emitterIterRegistration.go: ########## @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registration + +import ( + "context" + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type emitNative1[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative1[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative1[T]) invoke(val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative2[T1, T2]) invoke(key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative2WithTimestamp[T any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative2WithTimestamp[T]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative2WithTimestamp[T]) Value() interface{} { + return e.fn +} + +func (e *emitNative2WithTimestamp[T]) invoke(et typex.EventTime, val T) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +type emitNative3[T1, T2 any] struct { + n exec.ElementProcessor + fn interface{} + + ctx context.Context + ws []typex.Window + et typex.EventTime + value exec.FullValue +} + +func (e *emitNative3[T1, T2]) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { + e.ctx = ctx + e.ws = ws + e.et = et + return nil +} + +func (e *emitNative3[T1, T2]) Value() interface{} { + return e.fn +} + +func (e *emitNative3[T1, T2]) invoke(et typex.EventTime, key T1, val T2) { + e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2: val} + if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { + panic(err) + } +} + +// RegisterEmitter1 registers parameters from your DoFn with a +// signature func(T) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter1[T]((*func(T))(nil)) +func RegisterEmitter1[T1 any](e *func(T1)) { + registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter { + gen := &emitNative1[T1]{n: n} + gen.fn = gen.invoke + return gen + } + exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc) +} + +// RegisterEmitter2 registers parameters from your DoFn with a +// signature func(T1, T2) and optimizes their execution. +// This must be done by passing in a reference to an instantiated version +// of the function, aka: +// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil)) Review Comment: +1 to Danny's comment. It's not necessary to duplicate the analysis. Users *could* do things that aren't useful to them, but in practice likely won't. The big risk with these is that users change types and such, and end up not registering new ones. But we can ultimately give them a clear non-failing message for all the types their pipeline uses that are missing with code to copy paste, for the perf boost. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
