[ 
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=767312&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767312
 ]

ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/22 16:55
            Start Date: 06/May/22 16:55
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17574:
URL: https://github.com/apache/beam/pull/17574#discussion_r867009227


##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+       n  exec.ElementProcessor
+       fn interface{}
+
+       ctx   context.Context
+       ws    []typex.Window
+       et    typex.EventTime
+       value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et 
typex.EventTime) error {
+       e.ctx = ctx
+       e.ws = ws
+       e.et = et
+       return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+       return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+       e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+               panic(err)
+       }
+}
+
+type emitNative2[T1, T2 any] struct {
+       n  exec.ElementProcessor
+       fn interface{}
+
+       ctx   context.Context
+       ws    []typex.Window
+       et    typex.EventTime
+       value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et 
typex.EventTime) error {
+       e.ctx = ctx
+       e.ws = ws
+       e.et = et
+       return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+       return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+       e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, 
Elm2: val}
+       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+               panic(err)
+       }
+}
+
+type emitNative2WithTimestamp[T any] struct {

Review Comment:
   That's a good point, I updated accordingly



##########
sdks/go/pkg/beam/registration/emitterIterRegistration.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type emitNative1[T any] struct {
+       n  exec.ElementProcessor
+       fn interface{}
+
+       ctx   context.Context
+       ws    []typex.Window
+       et    typex.EventTime
+       value exec.FullValue
+}
+
+func (e *emitNative1[T]) Init(ctx context.Context, ws []typex.Window, et 
typex.EventTime) error {
+       e.ctx = ctx
+       e.ws = ws
+       e.et = et
+       return nil
+}
+
+func (e *emitNative1[T]) Value() interface{} {
+       return e.fn
+}
+
+func (e *emitNative1[T]) invoke(val T) {
+       e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val}
+       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+               panic(err)
+       }
+}
+
+type emitNative2[T1, T2 any] struct {
+       n  exec.ElementProcessor
+       fn interface{}
+
+       ctx   context.Context
+       ws    []typex.Window
+       et    typex.EventTime
+       value exec.FullValue
+}
+
+func (e *emitNative2[T1, T2]) Init(ctx context.Context, ws []typex.Window, et 
typex.EventTime) error {
+       e.ctx = ctx
+       e.ws = ws
+       e.et = et
+       return nil
+}
+
+func (e *emitNative2[T1, T2]) Value() interface{} {
+       return e.fn
+}
+
+func (e *emitNative2[T1, T2]) invoke(key T1, val T2) {
+       e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, 
Elm2: val}
+       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+               panic(err)
+       }
+}
+
+type emitNative2WithTimestamp[T any] struct {
+       n  exec.ElementProcessor
+       fn interface{}
+
+       ctx   context.Context
+       ws    []typex.Window
+       et    typex.EventTime
+       value exec.FullValue
+}
+
+func (e *emitNative2WithTimestamp[T]) Init(ctx context.Context, ws 
[]typex.Window, et typex.EventTime) error {
+       e.ctx = ctx
+       e.ws = ws
+       e.et = et
+       return nil
+}
+
+func (e *emitNative2WithTimestamp[T]) Value() interface{} {
+       return e.fn
+}
+
+func (e *emitNative2WithTimestamp[T]) invoke(et typex.EventTime, val T) {
+       e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: val}
+       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+               panic(err)
+       }
+}
+
+type emitNative3[T1, T2 any] struct {
+       n  exec.ElementProcessor
+       fn interface{}
+
+       ctx   context.Context
+       ws    []typex.Window
+       et    typex.EventTime
+       value exec.FullValue
+}
+
+func (e *emitNative3[T1, T2]) Init(ctx context.Context, ws []typex.Window, et 
typex.EventTime) error {
+       e.ctx = ctx
+       e.ws = ws
+       e.et = et
+       return nil
+}
+
+func (e *emitNative3[T1, T2]) Value() interface{} {
+       return e.fn
+}
+
+func (e *emitNative3[T1, T2]) invoke(et typex.EventTime, key T1, val T2) {
+       e.value = exec.FullValue{Windows: e.ws, Timestamp: et, Elm: key, Elm2: 
val}
+       if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+               panic(err)
+       }
+}
+
+// RegisterEmitter1 registers parameters from your DoFn with a
+// signature func(T) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter1[T]((*func(T))(nil))
+func RegisterEmitter1[T1 any](e *func(T1)) {
+       registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
+               gen := &emitNative1[T1]{n: n}
+               gen.fn = gen.invoke
+               return gen
+       }
+       exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+}
+
+// RegisterEmitter2 registers parameters from your DoFn with a
+// signature func(T1, T2) and optimizes their execution.
+// This must be done by passing in a reference to an instantiated version
+// of the function, aka:
+// registration.RegisterEmitter2[T1, T2]((*func(T1, T2))(nil))

Review Comment:
   I think function registration takes care of that, so I'd probably prefer to 
let that handle it rather than duplicating the effort - it has more context to 
return a good error anyways, and is guaranteed to get called (unlike this 
function)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 767312)
    Time Spent: 7h 10m  (was: 7h)

> [Go SDK] Allow users to optimize DoFns with a single generic registration 
> function
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-14347
>                 URL: https://issues.apache.org/jira/browse/BEAM-14347
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Danny McCormick
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator. 
> This updates to allow them to use generics instead.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to