This is an automated email from the ASF dual-hosted git repository.
danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b9d99405aba Add ExecuteBundles transform to Go FhirIO (#21840)
b9d99405aba is described below
commit b9d99405abaf74889eb9cf05eec4d67ea275a6b4
Author: Lucas Nogueira <[email protected]>
AuthorDate: Tue Jun 21 20:30:22 2022 -0400
Add ExecuteBundles transform to Go FhirIO (#21840)
* squashed rebase conflict commits
* add execute bundles transform with integration test
* adjust import to follow convention
* fix variable scope
* adjust read unit tests
* fix bad status test case
* improve read unit test assertions and naming
* add unit tests
* add comment to execute bundles transform
* include error reason
* make variable exported to fix integration test
* adjust integration tests after merge
* update license comment
* unify unit test utilities in single file
* add comment explaining the purpose of unexported transform function
* remove unnecessary generic usage
* remove coded mistakenly added
* improve transaction vs batch bundle comment
* use net/http constants instead of hardcoded values
* return error instead of t.Fail inside helper function
* improve execute bundles DoFn identifier
* adjust import spacing
* improve error message
---
sdks/go/pkg/beam/io/fhirio/common.go | 56 ++++++++
sdks/go/pkg/beam/io/fhirio/execute_bundles.go | 146 +++++++++++++++++++++
sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go | 82 ++++++++++++
sdks/go/pkg/beam/io/fhirio/fakes_test.go | 39 ------
sdks/go/pkg/beam/io/fhirio/read.go | 45 +++----
sdks/go/pkg/beam/io/fhirio/read_test.go | 55 ++------
sdks/go/pkg/beam/io/fhirio/utils_test.go | 107 +++++++++++++++
sdks/go/test/integration/io/fhirio/fhirio_test.go | 46 +++++--
8 files changed, 456 insertions(+), 120 deletions(-)
diff --git a/sdks/go/pkg/beam/io/fhirio/common.go
b/sdks/go/pkg/beam/io/fhirio/common.go
index 98bc10e3306..666f9d6f0e0 100644
--- a/sdks/go/pkg/beam/io/fhirio/common.go
+++ b/sdks/go/pkg/beam/io/fhirio/common.go
@@ -19,10 +19,16 @@
package fhirio
import (
+ "bytes"
"context"
+ "io"
"net/http"
+ "regexp"
+ "time"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"google.golang.org/api/healthcare/v1"
"google.golang.org/api/option"
)
@@ -32,8 +38,54 @@ const (
baseMetricPrefix = "fhirio/"
)
+func executeRequestAndRecordLatency(ctx context.Context, latencyMs
*beam.Distribution, requestSupplier func() (*http.Response, error))
(*http.Response, error) {
+ timeBeforeReadRequest := time.Now()
+ response, err := requestSupplier()
+ latencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
+ return response, err
+}
+
+func extractBodyFrom(response *http.Response) (string, error) {
+ if isBadStatusCode(response.Status) {
+ return "", errors.Errorf("response contains bad status: [%v]",
response.Status)
+ }
+
+ bodyBytes, err := io.ReadAll(response.Body)
+ if err != nil {
+ return "", err
+ }
+
+ return string(bodyBytes), nil
+}
+
+func isBadStatusCode(status string) bool {
+ // 2XXs are successes, otherwise failure.
+ isMatch, err := regexp.MatchString("^2\\d{2}", status)
+ if err != nil {
+ return true
+ }
+ return !isMatch
+}
+
+type fhirioFnCommon struct {
+ client fhirStoreClient
+ resourcesErrorCount beam.Counter
+ resourcesSuccessCount beam.Counter
+ latencyMs beam.Distribution
+}
+
+func (fnc *fhirioFnCommon) setup(namespace string) {
+ if fnc.client == nil {
+ fnc.client = newFhirStoreClient()
+ }
+ fnc.resourcesErrorCount = beam.NewCounter(namespace,
baseMetricPrefix+"resource_error_count")
+ fnc.resourcesSuccessCount = beam.NewCounter(namespace,
baseMetricPrefix+"resource_success_count")
+ fnc.latencyMs = beam.NewDistribution(namespace,
baseMetricPrefix+"latency_ms")
+}
+
type fhirStoreClient interface {
readResource(resourcePath string) (*http.Response, error)
+ executeBundle(storePath string, bundle []byte) (*http.Response, error)
}
type fhirStoreClientImpl struct {
@@ -51,3 +103,7 @@ func newFhirStoreClient() *fhirStoreClientImpl {
func (c *fhirStoreClientImpl) readResource(resourcePath string)
(*http.Response, error) {
return c.fhirService.Read(resourcePath).Do()
}
+
+func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte)
(*http.Response, error) {
+ return c.fhirService.ExecuteBundle(storePath,
bytes.NewReader(bundle)).Do()
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
b/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
new file mode 100644
index 00000000000..c96d72fae39
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "net/http"
+ "strings"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+const (
+ bundleResponseTypeBatch = "batch-response"
+ bundleResponseTypeTransaction = "transaction-response"
+)
+
+func init() {
+ register.DoFn4x0[context.Context, []byte, func(string),
func(string)]((*executeBundleFn)(nil))
+ register.Emitter1[string]()
+}
+
+type executeBundleFn struct {
+ fhirioFnCommon
+ successesCount beam.Counter
+ // Path to FHIR store where bundle requests will be executed on.
+ FhirStorePath string
+}
+
+func (fn executeBundleFn) String() string {
+ return "executeBundleFn"
+}
+
+func (fn *executeBundleFn) Setup() {
+ fn.fhirioFnCommon.setup(fn.String())
+ fn.successesCount = beam.NewCounter(fn.String(),
baseMetricPrefix+"success_count")
+}
+
+func (fn *executeBundleFn) ProcessElement(ctx context.Context, inputBundleBody
[]byte, emitSuccess, emitFailure func(string)) {
+ response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs,
func() (*http.Response, error) {
+ return fn.client.executeBundle(fn.FhirStorePath,
inputBundleBody)
+ })
+ if err != nil {
+ fn.resourcesErrorCount.Inc(ctx, 1)
+ emitFailure(errors.Wrap(err, "execute bundle request returned
error").Error())
+ return
+ }
+
+ body, err := extractBodyFrom(response)
+ if err != nil {
+ fn.resourcesErrorCount.Inc(ctx, 1)
+ emitFailure(errors.Wrap(err, "could not extract body from
execute bundles response").Error())
+ return
+ }
+
+ fn.processResponseBody(ctx, body, emitSuccess, emitFailure)
+}
+
+func (fn *executeBundleFn) processResponseBody(ctx context.Context, body
string, emitSuccess, emitFailure func(string)) {
+ var bodyFields struct {
+ Type string `json:"type"`
+ Entries []interface{} `json:"entry"`
+ }
+
+ err := json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+ if err != nil {
+ fn.resourcesErrorCount.Inc(ctx, 1)
+ emitFailure(errors.Wrap(err, "could not parse body from execute
bundle response").Error())
+ return
+ }
+
+ if bodyFields.Entries == nil {
+ return
+ }
+
+ // A BATCH bundle returns a success response even if entries have
failures, as
+ // entries are executed separately. However, TRANSACTION bundles should
return
+ // error response (in client.executeBundle call) if any entry fails.
Therefore,
+ // for BATCH bundles we need to parse the error and success counters.
+ switch bodyFields.Type {
+ case bundleResponseTypeTransaction:
+ fn.resourcesSuccessCount.Inc(ctx,
int64(len(bodyFields.Entries)))
+ emitSuccess(body)
+ case bundleResponseTypeBatch:
+ for _, entry := range bodyFields.Entries {
+ var entryFields struct {
+ Response struct {
+ Status string `json:"status"`
+ } `json:"response"`
+ }
+ entryBytes, _ := json.Marshal(entry)
+ _ =
json.NewDecoder(bytes.NewReader(entryBytes)).Decode(&entryFields)
+ if entryFields.Response.Status == "" {
+ continue
+ }
+
+ if isBadStatusCode(entryFields.Response.Status) {
+ fn.resourcesErrorCount.Inc(ctx, 1)
+ emitFailure(errors.Errorf("execute bundles
entry contains bad status: [%v]", entryFields.Response.Status).Error())
+ } else {
+ fn.resourcesSuccessCount.Inc(ctx, 1)
+ emitSuccess(string(entryBytes))
+ }
+ }
+ }
+
+ fn.successesCount.Inc(ctx, 1)
+}
+
+// ExecuteBundles performs all the requests in the specified bundles on a given
+// FHIR store. This transform takes a path to a FHIR store and a PCollection of
+// bundles as JSON-encoded strings. It executes the requests defined on the
+// bundles on the FHIR store located on the provided path. It outputs two
+// PCollection<string>, the first containing the response bodies of the
+// successfully performed requests and the second one error messages of the
+// requests that failed to be executed.
+// See:
https://cloud.google.com/healthcare-api/docs/samples/healthcare-fhir-execute-bundle
+func ExecuteBundles(s beam.Scope, fhirStorePath string, bundles
beam.PCollection) (beam.PCollection, beam.PCollection) {
+ s = s.Scope("fhirio.ExecuteBundles")
+ return executeBundles(s, fhirStorePath, bundles, nil)
+}
+
+// This is useful as an entry point for testing because we can provide a fake
FHIR store client.
+func executeBundles(s beam.Scope, fhirStorePath string, bundles
beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
+ return beam.ParDo2(s, &executeBundleFn{fhirioFnCommon:
fhirioFnCommon{client: client}, FhirStorePath: fhirStorePath}, bundles)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
b/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
new file mode 100644
index 00000000000..c0eb1174bb1
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+ "bytes"
+ "net/http"
+ "strings"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestExecuteBundles(t *testing.T) {
+ testCases := []struct {
+ name string
+ client fhirStoreClient
+ containedError string
+ }{
+ {
+ name: "Execute Bundles request returns error",
+ client: requestReturnErrorFakeClient,
+ containedError: fakeRequestReturnErrorMessage,
+ },
+ {
+ name: "Execute Bundles request returns bad
status",
+ client: badStatusFakeClient,
+ containedError: fakeBadStatus,
+ },
+ {
+ name: "Execute Bundles request response body
fails to be read",
+ client: bodyReaderErrorFakeClient,
+ containedError: fakeBodyReaderErrorMessage,
+ },
+ {
+ name: "Execute Bundles request response body failed to
be decoded",
+ client: &fakeFhirStoreClient{
+ fakeExecuteBundles: func(storePath string,
bundle []byte) (*http.Response, error) {
+ return &http.Response{
+ Body: &fakeReaderCloser{
+ fakeRead: func(t
[]byte) (int, error) {
+ return
bytes.NewReader([]byte("")).Read(t)
+ },
+ }, Status: "200 Ok"}, nil
+ },
+ },
+ containedError: "EOF",
+ },
+ }
+
+ testBundles := [][]byte{[]byte("foo"), []byte("bar")}
+ for _, testCase := range testCases {
+ t.Run(testCase.name, func(t *testing.T) {
+ p, s, bundles := ptest.CreateList(testBundles)
+ successfulBodies, failures := executeBundles(s, "bla",
bundles, testCase.client)
+ passert.Empty(s, successfulBodies)
+ passert.Count(s, failures, "", len(testBundles))
+ passert.True(s, failures, func(errorMsg string) bool {
+ return strings.Contains(errorMsg,
testCase.containedError)
+ })
+ pipelineResult := ptest.RunAndValidate(t, p)
+ err := validateResourceErrorCounter(pipelineResult,
len(testBundles))
+ if err != nil {
+ t.Fatalf("validateResourceErrorCounter returned
error [%v]", err.Error())
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/fakes_test.go
b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
deleted file mode 100644
index 4e1a51aeb23..00000000000
--- a/sdks/go/pkg/beam/io/fhirio/fakes_test.go
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package fhirio
-
-import "net/http"
-
-type fakeFhirStoreClient struct {
- fakeReadResources func(string) (*http.Response, error)
-}
-
-func (c *fakeFhirStoreClient) readResource(resourcePath string)
(*http.Response, error) {
- return c.fakeReadResources(resourcePath)
-}
-
-// Useful to fake the Body of a http.Response.
-type fakeReaderCloser struct {
- fakeRead func([]byte) (int, error)
-}
-
-func (*fakeReaderCloser) Close() error {
- return nil
-}
-
-func (m *fakeReaderCloser) Read(b []byte) (int, error) {
- return m.fakeRead(b)
-}
diff --git a/sdks/go/pkg/beam/io/fhirio/read.go
b/sdks/go/pkg/beam/io/fhirio/read.go
index a710c92a869..41c53a540b6 100644
--- a/sdks/go/pkg/beam/io/fhirio/read.go
+++ b/sdks/go/pkg/beam/io/fhirio/read.go
@@ -20,8 +20,7 @@ package fhirio
import (
"context"
- "io"
- "time"
+ "net/http"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -34,10 +33,7 @@ func init() {
}
type readResourceFn struct {
- client fhirStoreClient
- readResourceErrors beam.Counter
- readResourceSuccess beam.Counter
- readResourceLatencyMs beam.Distribution
+ fhirioFnCommon
}
func (fn readResourceFn) String() string {
@@ -45,40 +41,28 @@ func (fn readResourceFn) String() string {
}
func (fn *readResourceFn) Setup() {
- if fn.client == nil {
- fn.client = newFhirStoreClient()
- }
- fn.readResourceErrors = beam.NewCounter(fn.String(),
baseMetricPrefix+"read_resource_error_count")
- fn.readResourceSuccess = beam.NewCounter(fn.String(),
baseMetricPrefix+"read_resource_success_count")
- fn.readResourceLatencyMs = beam.NewDistribution(fn.String(),
baseMetricPrefix+"read_resource_latency_ms")
+ fn.fhirioFnCommon.setup(fn.String())
}
func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath
string, emitResource, emitDeadLetter func(string)) {
- timeBeforeReadRequest := time.Now()
- response, err := fn.client.readResource(resourcePath)
- fn.readResourceLatencyMs.Update(ctx,
time.Since(timeBeforeReadRequest).Milliseconds())
-
+ response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs,
func() (*http.Response, error) {
+ return fn.client.readResource(resourcePath)
+ })
if err != nil {
- fn.readResourceErrors.Inc(ctx, 1)
- emitDeadLetter(errors.Wrapf(err, "failed fetching resource
[%s]", resourcePath).Error())
- return
- }
-
- if response.StatusCode != 200 {
- fn.readResourceErrors.Inc(ctx, 1)
- emitDeadLetter(errors.Errorf("fetched resource [%s] returned
bad status [%d]", resourcePath, response.StatusCode).Error())
+ fn.resourcesErrorCount.Inc(ctx, 1)
+ emitDeadLetter(errors.Wrapf(err, "read resource request
returned error on input: [%v]", resourcePath).Error())
return
}
- bytes, err := io.ReadAll(response.Body)
+ body, err := extractBodyFrom(response)
if err != nil {
- fn.readResourceErrors.Inc(ctx, 1)
- emitDeadLetter(errors.Wrapf(err, "error reading response body
of resource [%s]", resourcePath).Error())
+ fn.resourcesErrorCount.Inc(ctx, 1)
+ emitDeadLetter(errors.Wrapf(err, "could not extract body from
read resource [%v] response", resourcePath).Error())
return
}
- fn.readResourceSuccess.Inc(ctx, 1)
- emitResource(string(bytes))
+ fn.resourcesSuccessCount.Inc(ctx, 1)
+ emitResource(body)
}
// Read fetches resources from Google Cloud Healthcare FHIR stores based on the
@@ -93,6 +77,7 @@ func Read(s beam.Scope, resourcePaths beam.PCollection)
(beam.PCollection, beam.
return read(s, resourcePaths, nil)
}
+// This is useful as an entry point for testing because we can provide a fake
FHIR store client.
func read(s beam.Scope, resourcePaths beam.PCollection, client
fhirStoreClient) (beam.PCollection, beam.PCollection) {
- return beam.ParDo2(s, &readResourceFn{client: client}, resourcePaths)
+ return beam.ParDo2(s, &readResourceFn{fhirioFnCommon:
fhirioFnCommon{client: client}}, resourcePaths)
}
diff --git a/sdks/go/pkg/beam/io/fhirio/read_test.go
b/sdks/go/pkg/beam/io/fhirio/read_test.go
index 7b8cba1af8d..39ae484b92b 100644
--- a/sdks/go/pkg/beam/io/fhirio/read_test.go
+++ b/sdks/go/pkg/beam/io/fhirio/read_test.go
@@ -16,8 +16,6 @@
package fhirio
import (
- "errors"
- "net/http"
"strings"
"testing"
@@ -32,35 +30,19 @@ func TestRead(t *testing.T) {
containedError string
}{
{
- name: "Read Request Failed",
- client: &fakeFhirStoreClient{
- fakeReadResources: func(resource string)
(*http.Response, error) {
- return nil, errors.New("")
- },
- },
- containedError: "failed fetching resource",
+ name: "Read request returns error",
+ client: requestReturnErrorFakeClient,
+ containedError: fakeRequestReturnErrorMessage,
},
{
- name: "Read Request Returns Bad Status",
- client: &fakeFhirStoreClient{
- fakeReadResources: func(resource string)
(*http.Response, error) {
- return &http.Response{StatusCode: 403},
nil
- },
- },
- containedError: "returned bad status",
+ name: "Read request returns bad status",
+ client: badStatusFakeClient,
+ containedError: fakeBadStatus,
},
{
- name: "Response body fails to be parsed",
- client: &fakeFhirStoreClient{
- fakeReadResources: func(resource string)
(*http.Response, error) {
- return &http.Response{Body:
&fakeReaderCloser{
- fakeRead: func([]byte) (int,
error) {
- return 0, errors.New("")
- },
- }, StatusCode: 200}, nil
- },
- },
- containedError: "error reading response body",
+ name: "Read request response body fails to be
read",
+ client: bodyReaderErrorFakeClient,
+ containedError: fakeBodyReaderErrorMessage,
},
}
@@ -75,23 +57,10 @@ func TestRead(t *testing.T) {
return strings.Contains(errorMsg,
testCase.containedError)
})
pipelineResult := ptest.RunAndValidate(t, p)
- counterResults :=
pipelineResult.Metrics().AllMetrics().Counters()
-
- if len(counterResults) != 1 {
- t.Fatalf("counterResults got length %v,
expected %v", len(counterResults), 1)
- }
- counterResult := counterResults[0]
-
- expectedCounterName :=
"fhirio/read_resource_error_count"
- if counterResult.Name() != expectedCounterName {
- t.Fatalf("counterResult.Name() is '%v',
expected '%v'", counterResult.Name(), expectedCounterName)
+ err := validateResourceErrorCounter(pipelineResult,
len(testResourcePaths))
+ if err != nil {
+ t.Fatalf("validateResourceErrorCounter returned
error [%v]", err.Error())
}
-
- expectedCounterResult := int64(len(testResourcePaths))
- if counterResult.Result() != expectedCounterResult {
- t.Fatalf("counterResult.Result() is %v,
expected %v", counterResult.Result(), expectedCounterResult)
- }
-
})
}
}
diff --git a/sdks/go/pkg/beam/io/fhirio/utils_test.go
b/sdks/go/pkg/beam/io/fhirio/utils_test.go
new file mode 100644
index 00000000000..a8ffefd207c
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/utils_test.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+ "errors"
+ "fmt"
+ "net/http"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+)
+
+var (
+ fakeRequestReturnErrorMessage = "internal error"
+ requestReturnErrorFakeClient = &fakeFhirStoreClient{
+ fakeReadResources: func(resource string) (*http.Response,
error) {
+ return nil, errors.New(fakeRequestReturnErrorMessage)
+ },
+ fakeExecuteBundles: func(storePath string, bundle []byte)
(*http.Response, error) {
+ return nil, errors.New(fakeRequestReturnErrorMessage)
+ },
+ }
+
+ fakeBadStatus = "403 Forbidden"
+ badStatusFakeResponse = &http.Response{Status: fakeBadStatus}
+ badStatusFakeClient = &fakeFhirStoreClient{
+ fakeReadResources: func(resource string) (*http.Response,
error) {
+ return badStatusFakeResponse, nil
+ },
+ fakeExecuteBundles: func(storePath string, bundle []byte)
(*http.Response, error) {
+ return badStatusFakeResponse, nil
+ },
+ }
+
+ fakeBodyReaderErrorMessage = "ReadAll fail"
+ bodyReaderErrorFakeResponse = &http.Response{
+ Body: &fakeReaderCloser{
+ fakeRead: func([]byte) (int, error) {
+ return 0, errors.New(fakeBodyReaderErrorMessage)
+ },
+ }, Status: "200 Ok"}
+ bodyReaderErrorFakeClient = &fakeFhirStoreClient{
+ fakeReadResources: func(resource string) (*http.Response,
error) {
+ return bodyReaderErrorFakeResponse, nil
+ },
+ fakeExecuteBundles: func(storePath string, bundle []byte)
(*http.Response, error) {
+ return bodyReaderErrorFakeResponse, nil
+ },
+ }
+)
+
+type fakeFhirStoreClient struct {
+ fakeReadResources func(string) (*http.Response, error)
+ fakeExecuteBundles func(storePath string, bundle []byte)
(*http.Response, error)
+}
+
+func (c *fakeFhirStoreClient) executeBundle(storePath string, bundle []byte)
(*http.Response, error) {
+ return c.fakeExecuteBundles(storePath, bundle)
+}
+
+func (c *fakeFhirStoreClient) readResource(resourcePath string)
(*http.Response, error) {
+ return c.fakeReadResources(resourcePath)
+}
+
+// Useful to fake the Body of a http.Response.
+type fakeReaderCloser struct {
+ fakeRead func([]byte) (int, error)
+}
+
+func (*fakeReaderCloser) Close() error {
+ return nil
+}
+
+func (m *fakeReaderCloser) Read(b []byte) (int, error) {
+ return m.fakeRead(b)
+}
+
+func validateResourceErrorCounter(pipelineResult beam.PipelineResult,
expectedCount int) error {
+ counterResults := pipelineResult.Metrics().AllMetrics().Counters()
+ if len(counterResults) != 1 {
+ return fmt.Errorf("counterResults got length %v, expected %v",
len(counterResults), 1)
+ }
+ counterResult := counterResults[0]
+
+ expectedCounterName := "fhirio/resource_error_count"
+ if counterResult.Name() != expectedCounterName {
+ return fmt.Errorf("counterResult.Name() is '%v', expected
'%v'", counterResult.Name(), expectedCounterName)
+ }
+
+ if counterResult.Result() != int64(expectedCount) {
+ return fmt.Errorf("counterResult.Result() is %v, expected %v",
counterResult.Result(), expectedCount)
+ }
+ return nil
+}
diff --git a/sdks/go/test/integration/io/fhirio/fhirio_test.go
b/sdks/go/test/integration/io/fhirio/fhirio_test.go
index 538d1c6b78f..560e74f4522 100644
--- a/sdks/go/test/integration/io/fhirio/fhirio_test.go
+++ b/sdks/go/test/integration/io/fhirio/fhirio_test.go
@@ -24,6 +24,7 @@ import (
"flag"
"fmt"
"math/big"
+ "net/http"
"os"
"strconv"
"strings"
@@ -62,11 +63,20 @@ func checkFlags(t *testing.T) {
}
}
+func setupFhirStoreWithData(t *testing.T) (string, []string, func()) {
+ return setupFhirStore(t, true)
+}
+
+func setupEmptyFhirStore(t *testing.T) (string, func()) {
+ storePath, _, teardownFunc := setupFhirStore(t, false)
+ return storePath, teardownFunc
+}
+
// Sets up a test fhir store by creating and populating data to it for testing
// purposes. It returns the name of the created store path, a slice of the
// resource paths to be used in tests, and a function to teardown what has been
// set up.
-func setupFhirStore(t *testing.T) (string, []string, func()) {
+func setupFhirStore(t *testing.T, shouldPopulateStore bool) (string, []string,
func()) {
t.Helper()
if storeService == nil || storeManagementService == nil {
t.Fatal("Healthcare Services were not initialized")
@@ -75,13 +85,16 @@ func setupFhirStore(t *testing.T) (string, []string,
func()) {
healthcareDataset := fmt.Sprintf(datasetPathFmt, *gcpopts.Project,
*gcpopts.Region)
createdFhirStore, err := createStore(healthcareDataset)
if err != nil {
- t.Fatal("Test store failed to be created")
+ t.Fatalf("Test store failed to be created. Reason: %v",
err.Error())
}
createdFhirStorePath := createdFhirStore.Name
- resourcePaths := populateStore(createdFhirStorePath)
- if len(resourcePaths) == 0 {
- t.Fatal("No data got populated to test")
+ var resourcePaths []string
+ if shouldPopulateStore {
+ resourcePaths = populateStore(createdFhirStorePath)
+ if len(resourcePaths) == 0 {
+ t.Fatal("No data got populated to test")
+ }
}
return createdFhirStorePath, resourcePaths, func() {
@@ -168,7 +181,7 @@ func TestFhirIO_Read(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)
- _, testResourcePaths, teardownFhirStore := setupFhirStore(t)
+ _, testResourcePaths, teardownFhirStore := setupFhirStoreWithData(t)
defer teardownFhirStore()
p, s, resourcePaths := ptest.CreateList(testResourcePaths)
@@ -183,7 +196,7 @@ func TestFhirIO_InvalidRead(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)
- fhirStorePath, _, teardownFhirStore := setupFhirStore(t)
+ fhirStorePath, _, teardownFhirStore := setupFhirStoreWithData(t)
defer teardownFhirStore()
invalidResourcePath := fhirStorePath + "/fhir/Patient/invalid"
@@ -192,12 +205,29 @@ func TestFhirIO_InvalidRead(t *testing.T) {
passert.Count(s, failedReads, "", 1)
passert.Empty(s, resources)
passert.True(s, failedReads, func(errorMsg string) bool {
- return strings.Contains(errorMsg, "bad status [404]")
+ return strings.Contains(errorMsg,
strconv.Itoa(http.StatusNotFound))
})
ptest.RunAndValidate(t, p)
}
+func TestFhirIO_ExecuteBundles(t *testing.T) {
+ integration.CheckFilters(t)
+ checkFlags(t)
+
+ fhirStorePath, teardownFhirStore := setupEmptyFhirStore(t)
+ defer teardownFhirStore()
+
+ p, s, bundles := ptest.CreateList(readPrettyBundles())
+ successBodies, failures := fhirio.ExecuteBundles(s, fhirStorePath,
bundles)
+ passert.Count(s, successBodies, "", 2)
+ passert.Count(s, failures, "", 2)
+ passert.True(s, failures, func(errorMsg string) bool {
+ return strings.Contains(errorMsg,
strconv.Itoa(http.StatusBadRequest))
+ })
+ ptest.RunAndValidate(t, p)
+}
+
func TestMain(m *testing.M) {
flag.Parse()
beam.Init()