devanshmodi commented on code in PR #28873:
URL: https://github.com/apache/beam/pull/28873#discussion_r1368967350
##########
examples/notebooks/healthcare/beam_post_hl7_messages_to_hcapi.ipynb:
##########
@@ -0,0 +1,504 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "private_outputs": true,
+ "toc_visible": true,
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/devanshmodi/beam/blob/devanshmodi-patch-healthcare-hl7-to-hcapi/examples/notebooks/healthcare/beam_post_hl7_messages_to_hcapi.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "zQ_JXPR3RoFV"
+ },
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License\n",
+ "\n",
+ "##################################\n",
+ "# Author: Devansh Modi #\n",
+ "##################################\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Highlevel Architecture**\n",
+ "\n",
+ ""
+ ],
+ "metadata": {
+ "id": "RL1LDp645ogr"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# **Post Hl7v2 messages to Google Cloud Healthcare API HL7v2 store
pipeline**\n",
+ "\n",
+ "This example demonstrates how to set up an Apache Beam pipeline that
reads a HL7 file from [Google Cloud
Storage](https://https://cloud.google.com/storage), and calls the [Google Cloud
Healthcare API Hl7v2 store to store Hl7
messages](https://cloud.google.com/healthcare-api/docs/how-tos/hl7v2-messages)
to extract information from unstructured data. This application can be used in
contexts such as reading raw Hl7 messages, if needed parse them or modify them
as per your defined Hl7v2 store configurations and store data into Hl7v2
store.\n",
+ "\n",
+ "An Apache Beam pipeline is a pipeline that reads input data,
transforms that data, and writes output data. It consists of PTransforms and
PCollections. A PCollection represents a distributed data set that your Beam
pipeline operates on. A PTransform represents a data processing operation, or a
step, in your pipeline. It takes one or more PCollections as input, performs a
processing function that you provide on the elements of that PCollection, and
produces zero or more output PCollection objects.\n",
+ "\n",
+ "For details about Apache Beam pipelines, including PTransforms and
PCollections, visit the [Beam Programming
Guide](https://beam.apache.org/documentation/programming-guide/).\n",
+ "\n",
+ "You'll be able to use this notebook to explore the data in each
PCollection."
+ ],
+ "metadata": {
+ "id": "wC9KRrlORwKu"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "***Sample HL7v2 Message***\n",
+ "\n",
+ "The below reference message shows a sample Hl7v2 messages seperated
by \\r.\n",
+ "\n",
+
"**MSH|^~\\&|FROM_APP|FROM_FACILITY|TO_APP|TO_FACILITY|20150503223000||ADT^A01|20150503223000|P|2.5|\\r\n",
+ "EVN|A01|20110613083617|\\r\n",
+ "PID|1||21004053^^^^MRN||SULLY^BRIAN||19611209|M|||123 MAIN
ST^^MOUNTAIN SPRINGS^CO^80439|\\r\n",
+ "PV1||I|H73 RM1^1^^HIGHWAY 73 CLINIC||||5148^MARY
QUINN|||||||||Y||||||||||||||||||||||||||||20150503223000|**\n",
+ "\n",
+ "The file contains many such messages and the objective of this code
will be to split and construct messages and POST it to Google Cloud HealthCare
API HL7v2 store."
+ ],
+ "metadata": {
+ "id": "AOVYgtyaqSxa"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Lets install necessary packages"
+ ],
+ "metadata": {
+ "id": "81wCK9XnS6Sc"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!pip install apache-beam[gcp]"
+ ],
+ "metadata": {
+ "id": "Yv1phmRZS23c"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**GCP Setup**\n",
+ "1. Authenticate your notebook by `gcloud auth application-default
login` in the Colab terminal.\n",
+ "\n",
+ "2. Run `gcloud config set project <YOUR-PROJECT>`\n",
+ "\n",
+ "Set the variables in the next cell based upon your project and
preferences.\n",
+ "\n",
+ "Note that below, **us-central1** is hardcoded as the location. This
is because of the limited number of
[locations](https://cloud.google.com/healthcare-api/docs/how-tos/hl7v2-messages)
the API currently supports."
+ ],
+ "metadata": {
+ "id": "tpePe_yOsdSJ"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Before running please set the following variables as arguments as
mentioned below\n"
+ ],
+ "metadata": {
+ "id": "_1Q3mw1usnoE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "args = {'gcp_project':'xxx', #GCP project ID\n",
+ " 'gcp_region':'xxx', # GCP project region\n",
+ " 'temp_location':'gs://<YOUR Bucket>/tmp', #input location
where your HL7 messages are stored in GCS bucket\n",
+ " 'input_file':'gs://<YOUR Bucket>/my_message.hl7', #input
location where your HL7 messages are stored in GCS bucket\n",
+ " 'hcapi_project_id':'xxxxxx', #healthcare API project ID\n",
+ " 'hcapi_dataset':'xxxx', #healthcare dataset\n",
+ " 'hcapi_version':'v1', #healthcare API version by defualt
v1\n",
+ " 'hcapi_location':'xxxx', #healthcare API configured
location\n",
+ " 'hcapi_hl7_store':'xxx', #healthcare api hl7 store\n",
+ " 'hcapi_fhir_store':''}"
+ ],
+ "metadata": {
+ "id": "a722GbqdvgOX"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Google Cloud Healthcare (HCAPI) API Utils class**\n",
+ "\n",
+ "Below is the code snippet which describes the class having healthcare
API connections and configurations. Basic functionality, includes constructing
the hcapi_url as per the input parameters, clean the HL7 message in terms of
proper formatting and post hl7v2 message to hl7v2 store. You can add more
transformations as per your requirements."
+ ],
+ "metadata": {
+ "id": "NHzk8JIqxQoa"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import google.auth\n",
+ "import google.auth.transport.requests\n",
+ "import base64\n",
+ "import json\n",
+ "import hashlib\n",
+ "import requests\n",
+ "import logging\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.options.pipeline_options import SetupOptions\n",
+ "from apache_beam.testing.test_pipeline import TestPipeline\n",
+ "import apache_beam.runners.interactive.interactive_beam as ib\n",
+ "from apache_beam import io\n",
+ "\n",
+ "logging.basicConfig(level=logging.INFO, format='%(asctime)s ::
%(levelname)s :: %(message)s')\n",
+ "\n",
+ "class hcapi_cls:\n",
+ "\n",
+ " def __init__(self, args):\n",
+ " self.hcapi_hl7_store = str(args['hcapi_hl7_store'])\n",
+ " self.hcapi_project_id = str(args['hcapi_project_id'])\n",
+ " self.hcapi_version = str(args['hcapi_version'])\n",
+ " self.hcapi_location = str(args['hcapi_location'])\n",
+ " self.hcapi_dataset = str(args['hcapi_dataset'])\n",
+ " self.hcapi_fhir_store = str(args['hcapi_fhir_store'])\n",
+ " self.token = None\n",
+ "\n",
+ " def google_api_headers(self):\n",
+ " \"\"\" Function executes self and gets the token for the
request \"\"\"\n",
+ " logging.info(\"fetching token and refreshing
credentials\")\n",
+ " creds, project = google.auth.default()\n",
+ " auth_req = google.auth.transport.requests.Request()\n",
+ " creds.refresh(auth_req)\n",
+ " return {\n",
+ " \"Authorization\": f\"Bearer {creds.token}\",\n",
+ " \"Prefer\": \"handling=strict\"\n",
+ " }\n",
+ "\n",
+ " def hcapi_dataset_url(self, version=None, project=None,
location=None, dataset=None):\n",
+ " \"\"\" This function creates base hcapi dataset url and
returns it \"\"\"\n",
+ " base = 'https://healthcare.googleapis.com'\n",
+ " version = self.hcapi_version\n",
+ " project = self.hcapi_project_id\n",
+ " location = self.hcapi_location\n",
+ " dataset = self.hcapi_dataset\n",
+ " return
f'{base}/{version}/projects/{project}/locations/{location}/datasets/{dataset}'\n",
+ "\n",
+ " def hcapi_get(self, url):\n",
+ " \"\"\" Function to send get request to HCAPI \"\"\"\n",
+ " response = requests.get(url,
headers=self.google_api_headers())\n",
+ " if not response.ok:\n",
+ " raise Exception(f'Error with HC API
get:\\n{response.text}')\n",
+ " return response.json()\n",
+ "\n",
+ " def hcapi_post(self, url, data):\n",
+ " \"\"\" Function to send post request to HCAPI \"\"\"\n",
+ " response = requests.post(url,
headers=self.google_api_headers(), json=data)\n",
+ " if not response.ok:\n",
+ " raise Exception(f'Error with HC API
post:\\n{response.text}')\n",
+ " return response.json()\n",
+ "\n",
+ " def hcapi_delete(self, url):\n",
+ " \"\"\" Function to send delete request to HCAPI \"\"\"\n",
+ " response = requests.delete(url,
headers=self.google_api_headers())\n",
+ " if not response.ok:\n",
+ " raise Exception(f'Error with HC API
get:\\n{response.text}')\n",
+ " return response.json()\n",
+ "\n",
+ " def hcapi_hl7_url(self, version=None, project=None,
location=None, dataset=None, store=None):\n",
+ " \"\"\" This function creates hcapi hl7V2store url and returns
the url \"\"\"\n",
+ " base_url = self.hcapi_dataset_url(version=version,
project=project,\n",
+ " location=location,
dataset=dataset)\n",
+ " hl7_store = self.hcapi_hl7_store\n",
+ " return f'{base_url}/hl7V2Stores/{hl7_store}'\n",
+ "\n",
+ " def get_hl7_message(self, message_id):\n",
+ " \"\"\" Function to get message from HL7v2 store using HCAPI
URL \"\"\"\n",
+ " url = f'{self.hcapi_hl7_url()}/messages/{message_id}'\n",
+ " return self.hcapi_get(url)\n",
+ "\n",
+ " def post_hl7_message(self, payload):\n",
+ " \"\"\" Function to post messages to HL7v2 store \"\"\"\n",
+ " url = f'{self.hcapi_hl7_url()}/messages'\n",
+ " return self.hcapi_post(url, payload)\n",
+ "\n",
+ " def message_to_hl7_store(self, message):\n",
+ " \"\"\" Function to clean up Hl7 messages with \\r seperator
before posting to HCAPI \"\"\"\n",
+ " messase =str(message)\n",
+ " message = message.replace('\\n', '\\r')\n",
+ " message = message.replace('\\\\r', '\\r')\n",
+ " message = message.replace('\\r\\r', '\\r')\n",
+ " encoded = base64.b64encode(str(message).encode())\n",
+ " payload = {\n",
+ " \"message\": {\n",
+ " \"data\": encoded.decode()\n",
+ " }\n",
+ " }\n",
+ " return self.post_hl7_message(payload)\n",
+ "\n",
+ " def hcapi_fhir_url(self, version=None, project=None,
location=None, dataset=None, store=None):\n",
+ " \"\"\" This function creates hcapi fhir store url and returns
it \"\"\"\n",
+ " base_url = self.hcapi_dataset_url(version=version,
project=project,\n",
+ " location=location,
dataset=dataset)\n",
+ " if store is None:\n",
+ " raise Exception('No FHIR store specified')\n",
+ " return f'{base_url}/fhirStores/{store}/fhir'\n",
+ "\n",
+ " def hcapi_fhir_request(self, store_key, query, data={},
method='GET'):\n",
+ " \"\"\" Function to send post request to HCAPI FHIR store
\"\"\"\n",
+ " store = self.hcapi_fhir_store\n",
+ " if not store:\n",
+ " raise Exception(f\"Couldn't FHIR find store named
{store_key} in config\")\n",
+ " url = self.hcapi_fhir_url(store=store)\n",
+ " url = f'{url}/{query}' if query else url\n",
+ " get = lambda q, d: self.hcapi_get(url)\n",
+ " post = lambda q, d: self.hcapi_post(url, data)\n",
+ " delete = lambda q, d: self.hcapi_delete(url)\n",
+ " return {'GET': get, 'POST': post, 'DELETE' :
delete}[method](query, data)\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "H7g4_-rGS9P_"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Pipeline Setup**\n",
+ "\n",
+ "We will use InteractiveRunner in this notebook.\n",
+ "Following are the DoFn classes which carry out its respective
operations"
+ ],
+ "metadata": {
+ "id": "lXnzAtbHyUd2"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The following class **BuildFileName** takes the file name from the
element and converts its into string. You can enhance this class to construct
GCS bucket URL, if your GCS bucket prefix remains constant."
+ ],
+ "metadata": {
+ "id": "TKnL8kxh3Kms"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class BuildFileName(beam.DoFn):\n",
+ " \"\"\" Class to get file name from variable and returns the
filename \"\"\"\n",
+ " def process(self, element):\n",
+ " logging.info(\"processing the following file:
{}\".format(element))\n",
+ " file_path = str(element)\n",
+ " yield file_path"
+ ],
+ "metadata": {
+ "id": "N01E3dQd3Jr3"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The following class **BuildMessages** takes the GCS URL from the
above class reads it, sepeartes out each message, appends into a list and
return list for the next class."
+ ],
+ "metadata": {
+ "id": "Jej68R8w3i2Z"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class BuildMessages(beam.DoFn):\n",
+ " \"\"\" Class to read file, clean and seperate messgaes based on
MSH\"\"\"\n",
+ " def process(self, file_name):\n",
+ " try:\n",
+ " logging.info(\"starting to read file:
{}\".format(file_name))\n",
+ " file = io.gcsio.GcsIO().open(filename=file_name,
mode='r')\n",
+ " read_file = file.read()\n",
+ " new_file = str(read_file,
encoding='utf-8').replace('\\n', '\\r')\n",
+ " logging.info(\"starting to seperate HL7 messages into
list\")\n",
+ " messages=[]\n",
+ " for line in new_file.split('\\r'):\n",
+ " if line[:3] =='MSH':\n",
+ " messages.append(line)\n",
+ " else:\n",
+ " messages[-1]+= line\n",
+ "\n",
+ "\n",
+ " logging.info(\"total number of messages parsed are
{}\".format(len(messages)))\n",
+ " return messages\n",
+ " except Exception as error:\n",
+ " logging.error(\"got the following error while processing
: {}\".format('\\n'+str(error)))\n",
+ " raise Exception\n",
+ "\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "MC6tr_sGyNKG"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The following class **PostToHL7V2Store** takes the messages return in
the earlier class and POST each messages to Hl7v2 store ."
+ ],
+ "metadata": {
+ "id": "1hpuoUGA33jo"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class PostToHL7V2Store(beam.DoFn):\n",
+ " \"\"\" Class to read file, clean and seperate messgaes based on
MSH\"\"\"\n",
+ " def process(self, element):\n",
+ " try:\n",
+ " logging.info(\"starting to prepare and post message\")\n",
+ " hl7v2_store_response =
hcapi.message_to_hl7_store(element)\n",
+ " message_id =
hl7v2_store_response['name'].split(\"/\")[-1]\n",
+ " logging.info(\"successfully posted message to Hl7v2 store
with message id :- {}\".format(message_id))\n",
+ "\n",
+ " # \"Getting the messagge from HL7V2 store for testing
purpose\n",
+ " #message = nb.get_hl7_message(message_id)\n",
+ " #print(message)\n",
+ "\n",
Review Comment:
@damccorm Removed the code block to avoid any confusion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]