This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e2565a2848 [BEAM-14513] Add read transform and initial healthcare 
client (#17748)
4e2565a2848 is described below

commit 4e2565a2848f84a28746ace0287800af3e93a2e4
Author: Lucas Nogueira <[email protected]>
AuthorDate: Wed Jun 1 17:20:56 2022 +0000

    [BEAM-14513] Add read transform and initial healthcare client (#17748)
    
    * initial fhirio go sdk commit - client + read transform
    
    * add license comment
    
    * improve error message
    
    * rename import to follow convention
    
    * add metrics
    
    * follow import convention
    
    * renames and simplify folder structure
    
    * fix import to follow standard
    
    * add comments
    
    * improve variable naming
    
    * adjust comment formatting
    
    * remove unnecessary named import
    
    * adjust naming for consistency
    
    * use forward declarations for metrics
    
    * improve error messages to follow go style
    
    * improve metrics assertions
    
    * simplify function signature
    
    * use optimized register function
    
    * rollback settings changes commit mistakenly
    
    * use time.Since for staticcheck
---
 sdks/go/pkg/beam/io/fhirio/common.go     | 53 +++++++++++++++++
 sdks/go/pkg/beam/io/fhirio/fakes_test.go | 39 +++++++++++++
 sdks/go/pkg/beam/io/fhirio/read.go       | 98 ++++++++++++++++++++++++++++++++
 sdks/go/pkg/beam/io/fhirio/read_test.go  | 90 +++++++++++++++++++++++++++++
 4 files changed, 280 insertions(+)

diff --git a/sdks/go/pkg/beam/io/fhirio/common.go 
b/sdks/go/pkg/beam/io/fhirio/common.go
new file mode 100644
index 00000000000..8156b42ab35
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/common.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+       "context"
+       "net/http"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core"
+       "google.golang.org/api/healthcare/v1"
+       "google.golang.org/api/option"
+)
+
+const (
+       baseMetricPrefix = "fhirio/"
+       userAgent        = "apache-beam-io-google-cloud-platform-healthcare/" + 
core.SdkVersion
+)
+
+type fhirStoreClient interface {
+       readResource(resourcePath string) (*http.Response, error)
+}
+
+type fhirStoreClientImpl struct {
+       fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+}
+
+func newFhirStoreClient() *fhirStoreClientImpl {
+       healthcareService, err := healthcare.NewService(context.Background(), 
option.WithUserAgent(userAgent))
+       if err != nil {
+               panic("Failed to initialize Google Cloud Healthcare Service. 
Reason: " + err.Error())
+       }
+       return &fhirStoreClientImpl{fhirService: 
healthcare.NewProjectsLocationsDatasetsFhirStoresFhirService(healthcareService)}
+}
+
+func (c *fhirStoreClientImpl) readResource(resourcePath string) 
(*http.Response, error) {
+       return c.fhirService.Read(resourcePath).Do()
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/fakes_test.go 
b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
new file mode 100644
index 00000000000..4e1a51aeb23
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import "net/http"
+
+type fakeFhirStoreClient struct {
+       fakeReadResources func(string) (*http.Response, error)
+}
+
+func (c *fakeFhirStoreClient) readResource(resourcePath string) 
(*http.Response, error) {
+       return c.fakeReadResources(resourcePath)
+}
+
+// Useful to fake the Body of a http.Response.
+type fakeReaderCloser struct {
+       fakeRead func([]byte) (int, error)
+}
+
+func (*fakeReaderCloser) Close() error {
+       return nil
+}
+
+func (m *fakeReaderCloser) Read(b []byte) (int, error) {
+       return m.fakeRead(b)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/read.go 
b/sdks/go/pkg/beam/io/fhirio/read.go
new file mode 100644
index 00000000000..a710c92a869
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/read.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+       "context"
+       "io"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn4x0[context.Context, string, func(string), 
func(string)]((*readResourceFn)(nil))
+       register.Emitter1[string]()
+}
+
+type readResourceFn struct {
+       client                fhirStoreClient
+       readResourceErrors    beam.Counter
+       readResourceSuccess   beam.Counter
+       readResourceLatencyMs beam.Distribution
+}
+
+func (fn readResourceFn) String() string {
+       return "readResourceFn"
+}
+
+func (fn *readResourceFn) Setup() {
+       if fn.client == nil {
+               fn.client = newFhirStoreClient()
+       }
+       fn.readResourceErrors = beam.NewCounter(fn.String(), 
baseMetricPrefix+"read_resource_error_count")
+       fn.readResourceSuccess = beam.NewCounter(fn.String(), 
baseMetricPrefix+"read_resource_success_count")
+       fn.readResourceLatencyMs = beam.NewDistribution(fn.String(), 
baseMetricPrefix+"read_resource_latency_ms")
+}
+
+func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath 
string, emitResource, emitDeadLetter func(string)) {
+       timeBeforeReadRequest := time.Now()
+       response, err := fn.client.readResource(resourcePath)
+       fn.readResourceLatencyMs.Update(ctx, 
time.Since(timeBeforeReadRequest).Milliseconds())
+
+       if err != nil {
+               fn.readResourceErrors.Inc(ctx, 1)
+               emitDeadLetter(errors.Wrapf(err, "failed fetching resource 
[%s]", resourcePath).Error())
+               return
+       }
+
+       if response.StatusCode != 200 {
+               fn.readResourceErrors.Inc(ctx, 1)
+               emitDeadLetter(errors.Errorf("fetched resource [%s] returned 
bad status [%d]", resourcePath, response.StatusCode).Error())
+               return
+       }
+
+       bytes, err := io.ReadAll(response.Body)
+       if err != nil {
+               fn.readResourceErrors.Inc(ctx, 1)
+               emitDeadLetter(errors.Wrapf(err, "error reading response body 
of resource [%s]", resourcePath).Error())
+               return
+       }
+
+       fn.readResourceSuccess.Inc(ctx, 1)
+       emitResource(string(bytes))
+}
+
+// Read fetches resources from Google Cloud Healthcare FHIR stores based on the
+// resource path. It consumes a PCollection<string> of notifications from the
+// FHIR store of resource paths, and fetches the actual resource object on the
+// path in the notification. It outputs two PCollection<string>. The first
+// contains the fetched object as a JSON-encoded string, and the second is a
+// dead-letter with an error message, in case the object failed to be fetched.
+// See: 
https://cloud.google.com/healthcare-api/docs/how-tos/fhir-resources#getting_a_fhir_resource.
+func Read(s beam.Scope, resourcePaths beam.PCollection) (beam.PCollection, 
beam.PCollection) {
+       s = s.Scope("fhirio.Read")
+       return read(s, resourcePaths, nil)
+}
+
+func read(s beam.Scope, resourcePaths beam.PCollection, client 
fhirStoreClient) (beam.PCollection, beam.PCollection) {
+       return beam.ParDo2(s, &readResourceFn{client: client}, resourcePaths)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/read_test.go 
b/sdks/go/pkg/beam/io/fhirio/read_test.go
new file mode 100644
index 00000000000..4ed9bc5ab53
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/read_test.go
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+       "errors"
+       "net/http"
+       "strings"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestRead(t *testing.T) {
+       testCases := []struct {
+               name           string
+               client         fhirStoreClient
+               containedError string
+       }{
+               {
+                       name: "Read Request Failed",
+                       client: &fakeFhirStoreClient{
+                               fakeReadResources: func(resource string) 
(*http.Response, error) {
+                                       return nil, errors.New("")
+                               },
+                       },
+                       containedError: "failed fetching resource",
+               },
+               {
+                       name: "Read Request Returns Bad Status",
+                       client: &fakeFhirStoreClient{
+                               fakeReadResources: func(resource string) 
(*http.Response, error) {
+                                       return &http.Response{StatusCode: 403}, 
nil
+                               },
+                       },
+                       containedError: "returned bad status",
+               },
+               {
+                       name: "Response body fails to be parsed",
+                       client: &fakeFhirStoreClient{
+                               fakeReadResources: func(resource string) 
(*http.Response, error) {
+                                       return &http.Response{Body: 
&fakeReaderCloser{
+                                               fakeRead: func([]byte) (int, 
error) {
+                                                       return 0, errors.New("")
+                                               },
+                                       }, StatusCode: 200}, nil
+                               },
+                       },
+                       containedError: "error reading response body",
+               },
+       }
+
+       testResourcePaths := []string{"foo", "bar"}
+       for _, testCase := range testCases {
+               t.Run(testCase.name, func(t *testing.T) {
+                       p, s, resourcePaths := 
ptest.CreateList(testResourcePaths)
+                       resources, failedReads := read(s, resourcePaths, 
testCase.client)
+                       passert.Empty(s, resources)
+                       passert.Count(s, failedReads, "", 
len(testResourcePaths))
+                       passert.True(s, failedReads, func(errorMsg string) bool 
{
+                               return strings.Contains(errorMsg, 
testCase.containedError)
+                       })
+                       pipelineResult := ptest.RunAndValidate(t, p)
+                       counterResults := 
pipelineResult.Metrics().AllMetrics().Counters()
+                       if len(counterResults) != 1 {
+                               t.Fatal("Only one counter should have been 
used")
+                       }
+                       if counterResults[0].Name() != 
"fhirio/read_resource_error_count" {
+                               t.Fatal("Only error counter should have been 
used")
+                       }
+                       if counterResults[0].Result() != 
int64(len(testResourcePaths)) {
+                               t.Fatal("Counter should have been incremented 
by the number of test resource paths")
+                       }
+               })
+       }
+}

Reply via email to