[
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=767312&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767312
]
ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/22 16:55
Start Date: 06/May/22 16:55
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17574:
URL: https://github.com/apache/beam/pull/17574#discussion_r867009227
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T any] struct {
Review Comment:
That's a good point, I updated accordingly
##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative2WithTimestamp[T any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T]) Init(ctx context.Context, ws
[]typex.Window, et typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative2WithTimestamp[T]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+type emitNative3[T1, T2 any] struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative3[T1, T2]) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative3[T1, T2]) Value() interface{} {
+ return e.fn
+}
+
+func (e *emitNative3[T1, T2]) invoke(et typex.EventTime, key T1, val T2) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2:
val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
+ registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+ gen := &emitNative1[T1]{n: n}
+ gen.fn = gen.invoke
+ return gen
+ }
+ exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter2 registers parameters from your DoFn with a
+// signature func(T1, T2) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil))
Review Comment:
I think function registration takes care of that, so I'd probably prefer to
let that handle it rather than duplicating the effort - it has more context to
return a good error anyways, and is guaranteed to get called (unlike this
function)
Issue Time Tracking
-------------------
Worklog Id: (was: 767312)
Time Spent: 7h 10m (was: 7h)
> [Go SDK] Allow users to optimize DoFns with a single generic registration
> function
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14347
> URL: https://issues.apache.org/jira/browse/BEAM-14347
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 7h 10m
> Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator.
> This updates to allow them to use generics instead.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)