lostluck commented on code in PR #30815: URL: https://github.com/apache/beam/pull/30815#discussion_r1607068419
########## sdks/go/pkg/beam/io/webapi/webapi.go: ########## @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webapi + +import ( + "bytes" + "context" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "reflect" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*wrappedCallerOnlyUserType)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Request)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Response)(nil)).Elem()) + register.Emitter1[*Response]() + register.DoFn3x1[context.Context, *Request, func(*Response), error](&callerFn{}) +} + +// Caller is an interface to a Web API endpoint. +type Caller interface { + + // Call a Web API endpoint with a Request, yielding a Response. + Call(ctx context.Context, request *Request) (*Response, error) +} + +// SetupTeardown interfaces methods called during a setup and teardown DoFn lifecycle. +// Some clients to Web APIs may need to establish resources or network connections prior to +// calling a Web API. This is a separate interface from a Caller since some Web API endpoints +// do not require this. An internal hybrid interface is hidden for user convenience so that +// a developer need only provide what makes sense for the API read or write task. +type SetupTeardown interface { + + // Setup is called during a DoFn setup lifecycle method. + // Clients that need to instantiate resources would be performed within an implementation of this method. + Setup(ctx context.Context) error + + // Teardown is called during a DoFn teardown lifecycle method. + // Clients that need to destroy or close resources would be performed within an implementation of this method. + Teardown(ctx context.Context) error +} + +// Request holds the encoded payload of a Web API request. +// Caller implementations are responsible for decoding the Payload into the needed type required +// to fulfill the Web API request. +type Request struct { + + // Payload is the encoded Web API request. + Payload []byte `beam:"payload"` Review Comment: Question: Is this the approach used for the Java equivalent? That is, the input is encoded, and then must be decoded by the caller? While this is flexible, the big downside of this approach is that encoding/decoding is typically the most expensive operation, so forcing the behavior renders various optimizations like Fusion, ineffective without significant user side work. ########## sdks/go/pkg/beam/io/webapi/doc.go: ########## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package webapi supports reading from or writing to Web APIs. Review Comment: I know this is a work in progress, but the end goal isn't yet clear from this PR. That is the context of the following comment. Right now, this is (at present) an alternative API wrapper for a DoFn, that has a dead letter queue. So, as presently provided, how different is the best future experience this API provides vs a user writing a DoFn directly? What does that future experience & benefits look like? Remember, the point of an API wrapper like this is to make it easier for a user to do a complicated thing vs directly doing that complicated thing in the framework. Right now, it's about the same. But with throttling, retries, etc, or similar that are harder to implement right, but could be made convenient and consistent... that changes quickly. ########## sdks/go/pkg/beam/io/webapi/webapi.go: ########## @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webapi + +import ( + "bytes" + "context" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "reflect" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*wrappedCallerOnlyUserType)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Request)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Response)(nil)).Elem()) + register.Emitter1[*Response]() + register.DoFn3x1[context.Context, *Request, func(*Response), error](&callerFn{}) +} + +// Caller is an interface to a Web API endpoint. +type Caller interface { + + // Call a Web API endpoint with a Request, yielding a Response. + Call(ctx context.Context, request *Request) (*Response, error) +} + +// SetupTeardown interfaces methods called during a setup and teardown DoFn lifecycle. +// Some clients to Web APIs may need to establish resources or network connections prior to +// calling a Web API. This is a separate interface from a Caller since some Web API endpoints +// do not require this. An internal hybrid interface is hidden for user convenience so that +// a developer need only provide what makes sense for the API read or write task. +type SetupTeardown interface { + + // Setup is called during a DoFn setup lifecycle method. + // Clients that need to instantiate resources would be performed within an implementation of this method. + Setup(ctx context.Context) error + + // Teardown is called during a DoFn teardown lifecycle method. + // Clients that need to destroy or close resources would be performed within an implementation of this method. + Teardown(ctx context.Context) error +} + +// Request holds the encoded payload of a Web API request. +// Caller implementations are responsible for decoding the Payload into the needed type required +// to fulfill the Web API request. +type Request struct { + + // Payload is the encoded Web API request. + Payload []byte `beam:"payload"` Review Comment: Note, we can make this generic instead of forcing inefficient user side encoding & decoding, it just requires the additional registration steps due to the current SDK's limitations WRT registring types. eg. The closest to best experience I think we could manage is: ``` func init() { webapi.RegisterCaller(&myCaller{}) } ... responses, failures := webapi.Call(s, &myCaller{/*config*/}, requests) ... ``` That function with the most recent Go versions (1.21 I think?) should be able to infer the types. Make the interface and implementation types generic, this avoids the extra wrapper types for Request and Response. The callerFn type can then become generic on the user's caller type, the request type and the response type of the Caller interface methods, which is also pushed into the interface. ########## sdks/go/pkg/beam/io/webapi/doc.go: ########## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package webapi supports reading from or writing to Web APIs. + +Its design goals are to reduce the boilerplate of building Beam I/O connectors. See tracking issue: +https://github.com/apache/beam/issues/30423 and visit the Beam website +(https://beam.apache.org/documentation/io/built-in/webapis/) for details and examples. + +# Basic usage + +Basic usage requires providing a Caller to the Call func. + + var _ webapi.Caller = &myCaller{} + type myCaller struct { + // Make configuration details public and tag with: `beam:"endpoint"` so that they are encoded/decoded by Beam. + Endpoint string `beam:"endpoint"` + } + + // Call posts webapi.Request's JSON Payload in this example to myCaller's Endpoint. + // Returns a webapi.Response containing the HTTP response body. + func (caller *myCaller) Call(ctx context.Context, request *webapi.Request) (*webapi.Response, error) { + resp, err := http.Post(caller.Endpoint, "application/json", bytes.NewBuffer(request.Payload)) + if err != nil { + return nil, err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return &webapi.Response{ + Payload: body, + }, nil + } + +To use the Caller in a Beam PTransform, simply provide it to the Call func which returns a tuple of PCollections, +on for successful responses and another for any errors. + + requests := // PCollection of *webapi.Request. + responses, failures := Call(&myCaller{}) Review Comment: A few things about this example: 1. Not providing the scope parameter. 2. Not providing the input pcollection parameter. 3. Not namespacing it as `webapi.Call` which is how a user would actually see the code. 4. It doesn't look like this "Call" function exists yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
