This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5d12bd7c37b Healthcare NLP Colab Example (#27516)
5d12bd7c37b is described below
commit 5d12bd7c37b55693dc22a1a9b12c0df5299dff6e
Author: Svetak Sundhar <[email protected]>
AuthorDate: Mon Jul 24 11:15:18 2023 -0400
Healthcare NLP Colab Example (#27516)
* Created using Colaboratory
* Created using Colaboratory
* Delete tree/healthcarenlp/examples directory
* Created using Colaboratory
* Delete beam_nlp.ipynb
* Created using Colaboratory
* Created using Colaboratory
* Created using Colaboratory
* Delete beam_nlp.ipynb
* Created using Colaboratory
* remove specific output
* link to colab
* colab url
* refactor
* url
* updated
* url
* remove gcs bucket
* url
---
examples/notebooks/healthcare/beam_nlp.ipynb | 924 +++++++++++++++++++++++++++
1 file changed, 924 insertions(+)
diff --git a/examples/notebooks/healthcare/beam_nlp.ipynb
b/examples/notebooks/healthcare/beam_nlp.ipynb
new file mode 100644
index 00000000000..a1a77867025
--- /dev/null
+++ b/examples/notebooks/healthcare/beam_nlp.ipynb
@@ -0,0 +1,924 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/apache/beam/blob/healthcarenlp/examples/notebooks/healthcare/beam_nlp.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "id": "lBuUTzxD2mvJ",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# **Natural Language Processing Pipeline**\n",
+ "\n",
+ "**Note**: This example is used from
[here](https://github.com/rasalt/healthcarenlp/blob/main/nlp_public.ipynb).\n",
+ "\n",
+ "\n",
+ "\n",
+ "This example demonstrates how to set up an Apache Beam pipeline that
reads a file from [Google Cloud
Storage](https://https://cloud.google.com/storage), and calls the [Google Cloud
Healthcare NLP API](https://cloud.google.com/healthcare-api/docs/how-tos/nlp)
to extract information from unstructured data. This application can be used in
contexts such as reading scanned clinical documents and extracting structure
from it.\n",
+ "\n",
+ "An Apache Beam pipeline is a pipeline that reads input data,
transforms that data, and writes output data. It consists of PTransforms and
PCollections. A PCollection represents a distributed data set that your Beam
pipeline operates on. A PTransform represents a data processing operation, or a
step, in your pipeline. It takes one or more PCollections as input, performs a
processing function that you provide on the elements of that PCollection, and
produces zero or more output PC [...]
+ "\n",
+ "For details about Apache Beam pipelines, including PTransforms and
PCollections, visit the [Beam Programming
Guide](https://beam.apache.org/documentation/programming-guide/).\n",
+ "\n",
+ "You'll be able to use this notebook to explore the data in each
PCollection."
+ ],
+ "metadata": {
+ "id": "nEUAYCTx4Ijj"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "First, lets install the necessary packages."
+ ],
+ "metadata": {
+ "id": "ZLBB0PTG5CHw"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!pip install apache-beam[gcp]"
+ ],
+ "metadata": {
+ "id": "O7hq2sse8K4u"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ " **GCP Setup**"
+ ],
+ "metadata": {
+ "id": "5vQDhIv0E-LR"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "1. Authenticate your notebook by `gcloud auth application-default
login` in the Colab terminal.\n",
+ "\n",
+ "2. Run `gcloud config set project <YOUR-PROJECT>`"
+ ],
+ "metadata": {
+ "id": "DGYiBYfxsSCw"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Set the variables in the next cell based upon your project and
preferences. The files referred to in this notebook nlpsample*.csv are in the
format with one\n",
+ "blurb of clinical note.\n",
+ "\n",
+ "Note that below, **us-central1** is hardcoded as the location. This
is because of the limited number of
[locations](https://cloud.google.com/healthcare-api/docs/how-tos/nlp) the API
currently supports."
+ ],
+ "metadata": {
+ "id": "D7lJqW2PRFcN"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "DATASET=\"<YOUR-BQ-DATASEST>\"\n",
+ "TEMP_LOCATION=\"<YOUR-TEMP-LOCATION>\"\n",
+ "PROJECT='<YOUR-PROJECT>'\n",
+ "LOCATION='us-central1'\n",
+
"URL=f'https://healthcare.googleapis.com/v1/projects/{PROJECT}/locations/{LOCATION}/services/nlp:analyzeEntities'\n",
+ "NLP_SERVICE=f'projects/{PROJECT}/locations/{LOCATION}/services/nlp'"
+ ],
+ "metadata": {
+ "id": "s9lhe5CZ5F3o"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Then, download [this raw CSV
file](https://https://github.com/socd06/medical-nlp/blob/master/data/test.csv),
and then upload it into Colab. You should be able to view this file
(*test.csv*) in the \"Files\" tab in Colab after uploading."
+ ],
+ "metadata": {
+ "id": "1IArtEm8QuCR"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**BigQuery Setup**\n",
+ "\n",
+ "We will be using BigQuery to warehouse the structured data revealed
in the output of the Healthcare NLP API. For this purpose, we create 3 tables
to organize the data. Specifically, these will be table entities, table
relations, and table entity mentions, which are all outputs of interest from
the Healthcare NLP API."
+ ],
+ "metadata": {
+ "id": "DI_Qkyn75LO-"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.cloud import bigquery\n",
+ "\n",
+ "# Construct a BigQuery client object.\n",
+ "\n",
+ "TABLE_ENTITY=\"entity\"\n",
+ "\n",
+ "\n",
+ "schemaEntity = [\n",
+ " bigquery.SchemaField(\"entityId\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"preferredTerm\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"vocabularyCodes\", \"STRING\",
mode=\"REPEATED\"),\n",
+ "]\n",
+ "\n",
+ "\n",
+ "client = bigquery.Client()\n",
+ "\n",
+ "# Create Table IDs\n",
+ "table_ent = PROJECT+\".\"+DATASET+\".\"+TABLE_ENTITY\n",
+ "\n",
+ "\n",
+ "# If table exists, delete the tables.\n",
+ "client.delete_table(table_ent, not_found_ok=True)\n",
+ "\n",
+ "\n",
+ "# Create tables\n",
+ "\n",
+ "table = bigquery.Table(table_ent, schema=schemaEntity)\n",
+ "table = client.create_table(table) # Make an API request.\n",
+ "\n",
+ "print(\n",
+ " \"Created table {}.{}.{}\".format(table.project,
table.dataset_id, table.table_id)\n",
+ ")"
+ ],
+ "metadata": {
+ "id": "bZDqtFVE5Wd_"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.cloud import bigquery\n",
+ "\n",
+ "# Construct a BigQuery client object.\n",
+ "\n",
+ "TABLE_REL=\"relations\"\n",
+ "\n",
+ "schemaRelations = [\n",
+ " bigquery.SchemaField(\"subjectId\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"objectId\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"confidence\", \"FLOAT64\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"id\", \"STRING\", mode=\"NULLABLE\"),\n",
+ "]\n",
+ "\n",
+ "client = bigquery.Client()\n",
+ "\n",
+ "# Create Table IDs\n",
+ "\n",
+ "table_rel = PROJECT+\".\"+DATASET+\".\"+TABLE_REL\n",
+ "\n",
+ "# If table exists, delete the tables.\n",
+ "\n",
+ "client.delete_table(table_rel, not_found_ok=True)\n",
+ "\n",
+ "# Create tables\n",
+ "\n",
+ "table = bigquery.Table(table_rel, schema=schemaRelations)\n",
+ "table = client.create_table(table) # Make an API request.\n",
+ "print(\n",
+ " \"Created table {}.{}.{}\".format(table.project,
table.dataset_id, table.table_id)\n",
+ ")\n",
+ "\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "YK-G7uV5APuP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.cloud import bigquery\n",
+ "\n",
+ "# Construct a BigQuery client object.\n",
+ "\n",
+ "TABLE_ENTITYMENTIONS=\"entitymentions\"\n",
+ "\n",
+ "schemaEntityMentions = [\n",
+ " bigquery.SchemaField(\"mentionId\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"type\", \"STRING\", mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\n",
+ " \"text\",\n",
+ " \"RECORD\",\n",
+ " mode=\"NULLABLE\",\n",
+ " fields=[\n",
+ " bigquery.SchemaField(\"content\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"beginOffset\", \"INTEGER\",
mode=\"NULLABLE\"),\n",
+ " ],\n",
+ " ),\n",
+ " bigquery.SchemaField(\n",
+ " \"linkedEntities\",\n",
+ " \"RECORD\",\n",
+ " mode=\"REPEATED\",\n",
+ " fields=[\n",
+ " bigquery.SchemaField(\"entityId\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " ],\n",
+ " ),\n",
+ " bigquery.SchemaField(\n",
+ " \"temporalAssessment\",\n",
+ " \"RECORD\",\n",
+ " mode=\"NULLABLE\",\n",
+ " fields=[\n",
+ " bigquery.SchemaField(\"value\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"confidence\", \"FLOAT64\",
mode=\"NULLABLE\"),\n",
+ " ],\n",
+ " ),\n",
+ " bigquery.SchemaField(\n",
+ " \"certaintyAssessment\",\n",
+ " \"RECORD\",\n",
+ " mode=\"NULLABLE\",\n",
+ " fields=[\n",
+ " bigquery.SchemaField(\"value\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"confidence\", \"FLOAT64\",
mode=\"NULLABLE\"),\n",
+ " ],\n",
+ " ),\n",
+ " bigquery.SchemaField(\n",
+ " \"subject\",\n",
+ " \"RECORD\",\n",
+ " mode=\"NULLABLE\",\n",
+ " fields=[\n",
+ " bigquery.SchemaField(\"value\", \"STRING\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"confidence\", \"FLOAT64\",
mode=\"NULLABLE\"),\n",
+ " ],\n",
+ " ),\n",
+ " bigquery.SchemaField(\"confidence\", \"FLOAT64\",
mode=\"NULLABLE\"),\n",
+ " bigquery.SchemaField(\"id\", \"STRING\", mode=\"NULLABLE\")\n",
+ "]\n",
+ "\n",
+ "client = bigquery.Client()\n",
+ "\n",
+ "# Create Table IDs\n",
+ "\n",
+ "table_mentions = PROJECT+\".\"+DATASET+\".\"+TABLE_ENTITYMENTIONS\n",
+ "\n",
+ "# If table exists, delete the tables.\n",
+ "\n",
+ "client.delete_table(table_mentions, not_found_ok=True)\n",
+ "\n",
+ "# Create tables\n",
+ "\n",
+ "table = bigquery.Table(table_mentions,
schema=schemaEntityMentions)\n",
+ "table = client.create_table(table) # Make an API request.\n",
+ "print(\n",
+ " \"Created table {}.{}.{}\".format(table.project,
table.dataset_id, table.table_id)\n",
+ ")"
+ ],
+ "metadata": {
+ "id": "R9IHgZKoAQWj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Pipeline Setup**\n",
+ "\n",
+ "We will use InteractiveRunner in this notebook."
+ ],
+ "metadata": {
+ "id": "jc_iS_BP5aS4"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Python's regular expression library\n",
+ "import re\n",
+ "from sys import argv\n",
+ "# Beam and interactive Beam imports\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner\n",
+ "import apache_beam.runners.interactive.interactive_beam as ib\n",
+ "\n",
+ "#Reference
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_1\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "runnertype = \"InteractiveRunner\"\n",
+ "\n",
+ "options = PipelineOptions(\n",
+ " flags=argv,\n",
+ " runner=runnertype,\n",
+ " project=PROJECT,\n",
+ " job_name=\"my-healthcare-nlp-job\",\n",
+ " temp_location=TEMP_LOCATION,\n",
+ " region=LOCATION)"
+ ],
+ "metadata": {
+ "id": "07ct6kf55ihP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The following defines a `PTransform` named `ReadLinesFromText`, that
extracts lines from a file."
+ ],
+ "metadata": {
+ "id": "dO1A9_WK5lb4"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class ReadLinesFromText(beam.PTransform):\n",
+ "\n",
+ " def __init__(self, file_pattern):\n",
+ " self._file_pattern = file_pattern\n",
+ "\n",
+ " def expand(self, pcoll):\n",
+ " return (pcoll.pipeline\n",
+ " | beam.io.ReadFromText(self._file_pattern))"
+ ],
+ "metadata": {
+ "id": "t5iDRKMK5n_B"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The following sets up an Apache Beam pipeline with the *Interactive
Runner*. The *Interactive Runner* is the runner suitable for running in
notebooks. A runner is an execution engine for Apache Beam pipelines."
+ ],
+ "metadata": {
+ "id": "HI_HVB185sMQ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "p = beam.Pipeline(options = options)"
+ ],
+ "metadata": {
+ "id": "7osCZ1om5ql0"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The following sets up a PTransform that extracts words from a Google
Cloud Storage file that contains lines with each line containing a In our
example, each line is a medical notes excerpt that will be passed through the
Healthcare NLP API\n",
+ "\n",
+ "**\"|\"** is an overloaded operator that applies a PTransform to a
PCollection to produce a new PCollection. Together with |, >> allows you to
optionally name a PTransform.\n",
+ "\n",
+ "Usage:[PCollection] | [PTransform], **or** [PCollection] | [name] >>
[PTransform]"
+ ],
+ "metadata": {
+ "id": "EaF8NfC_521y"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "lines = p | 'read' >> ReadLinesFromText(\"test.csv\")"
+ ],
+ "metadata": {
+ "id": "2APAh6XQ6NYd",
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 72
+ },
+ "outputId": "033c5110-fd5a-4da0-b59b-801a1ce9d3b1"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+
"WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies
required for Interactive Beam PCollection visualization are not available,
please use: `pip install apache-beam[interactive]` to install necessary
dependencies to enable all data visualization features.\n"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "application/javascript": [
+ "\n",
+ " if (typeof window.interactive_beam_jquery ==
'undefined') {\n",
+ " var jqueryScript =
document.createElement('script');\n",
+ " jqueryScript.src =
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
+ " jqueryScript.type = 'text/javascript';\n",
+ " jqueryScript.onload = function() {\n",
+ " var datatableScript =
document.createElement('script');\n",
+ " datatableScript.src =
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
+ " datatableScript.type = 'text/javascript';\n",
+ " datatableScript.onload = function() {\n",
+ " window.interactive_beam_jquery =
jQuery.noConflict(true);\n",
+ "
window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " });\n",
+ " }\n",
+ " document.head.appendChild(datatableScript);\n",
+ " };\n",
+ " document.head.appendChild(jqueryScript);\n",
+ " } else {\n",
+ "
window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " });\n",
+ " }"
+ ]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:apache_beam.options.pipeline_options:Discarding
unparseable args:
['/usr/local/lib/python3.10/dist-packages/ipykernel_launcher.py', '-f',
'/root/.local/share/jupyter/runtime/kernel-0f4aa514-58fe-4223-91cc-be29c560c5ff.json']\n"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "We then write a **DoFn** that will invoke the [NLP
API](https://cloud.google.com/healthcare-api/docs/how-tos/nlp)."
+ ],
+ "metadata": {
+ "id": "vM_FbhkbGI-E"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class InvokeNLP(beam.DoFn):\n",
+ "\n",
+ " def process(self, element):\n",
+ " # import requests\n",
+ " import uuid\n",
+ " from google.auth import compute_engine\n",
+ " credentials = compute_engine.Credentials()\n",
+ " from google.auth.transport.requests import
AuthorizedSession\n",
+ " authed_session = AuthorizedSession(credentials)\n",
+ " url = URL\n",
+ " payload = {\n",
+ " 'nlp_service': NLP_SERVICE,\n",
+ " 'document_content': element\n",
+ " }\n",
+ " resp = authed_session.post(url, data=payload)\n",
+ " response = resp.json()\n",
+ " response['id'] = uuid.uuid4().hex[:8]\n",
+ " yield response\n",
+ "\n",
+ "class AnalyzeLines(beam.PTransform):\n",
+ " def expand(self, pcoll):\n",
+ " return (\n",
+ " pcoll\n",
+ " | \"Invoke NLP API\" >> beam.ParDo(InvokeNLP())\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "3ZJ-0dex9WE5"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "From our elements, being processed, we will get the entity mentions,
relationships, and entities respectively."
+ ],
+ "metadata": {
+ "id": "TeYxIlNgGdK0"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import json\n",
+ "from apache_beam import pvalue\n",
+ "\n",
+ "class breakUpEntities(beam.DoFn):\n",
+ " def process(self, element):\n",
+ " for e in element['entities']:\n",
+ " print(e)\n",
+ " yield e\n",
+ "\n",
+ "class getRelationships(beam.DoFn):\n",
+ " def process(self, element):\n",
+ " obj = {}\n",
+ " id = element['id']\n",
+ " for e in element['relationships']:\n",
+ " obj = e\n",
+ " obj['id'] = id\n",
+ " yield obj\n",
+ "\n",
+ "class getEntityMentions(beam.DoFn):\n",
+ " def process(self, element):\n",
+ " obj = {}\n",
+ " for e in element['entityMentions']:\n",
+ " e['id'] = element['id']\n",
+ " yield e\n"
+ ],
+ "metadata": {
+ "id": "3KZgUv3d6haf"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from apache_beam.io.gcp.internal.clients import bigquery\n",
+ "\n",
+ "\n",
+ "table_spec = bigquery.TableReference(\n",
+ " projectId=PROJECT,\n",
+ " datasetId=DATASET,\n",
+ " tableId=TABLE_ENTITY)\n",
+ "\n",
+ "nlp_annotations = (lines\n",
+ " | \"Analyze\" >> AnalyzeLines()\n",
+ " )\n"
+ ],
+ "metadata": {
+ "id": "OkxgB2a-6iYN"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "We then write these results to
[BigQuery](https://cloud.google.com/bigquery), a cloud data warehouse."
+ ],
+ "metadata": {
+ "id": "iTh65CXIGoQn"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "resultsEntities = ( nlp_annotations\n",
+ " | \"Break\" >> beam.ParDo(breakUpEntities())\n",
+ " | \"WriteToBigQuery\" >> beam.io.WriteToBigQuery(\n",
+ " table_spec,\n",
+ "
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n",
+ "
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "Q9GIyLeS6oAe"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "table_spec = bigquery.TableReference(\n",
+ " projectId=PROJECT,\n",
+ " datasetId=DATASET,\n",
+ " tableId=TABLE_REL)\n",
+ "\n",
+ "resultsRelationships = ( nlp_annotations\n",
+ " | \"GetRelationships\" >>
beam.ParDo(getRelationships())\n",
+ " | \"WriteToBigQuery\" >> beam.io.WriteToBigQuery(\n",
+ " table_spec,\n",
+ "
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n",
+ "
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "yOlHfkcT6s4y"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "table_spec = bigquery.TableReference(\n",
+ " projectId=PROJECT,\n",
+ " datasetId=DATASET,\n",
+ " tableId=TABLE_ENTITYMENTIONS)\n",
+ "\n",
+ "resultsEntityMentions = ( nlp_annotations\n",
+ " | \"GetEntityMentions\" >>
beam.ParDo(getEntityMentions())\n",
+ " | \"WriteToBigQuery\" >> beam.io.WriteToBigQuery(\n",
+ " table_spec,\n",
+ "
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n",
+ "
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "a6QxxnY890Za"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "You can see the job graph for the pipeline by doing:"
+ ],
+ "metadata": {
+ "id": "6rP2nO6Z60bt"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "ib.show_graph(p)"
+ ],
+ "metadata": {
+ "id": "zQB5h1Zq6x8d",
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 806
+ },
+ "outputId": "7885e493-fee8-402e-baf2-cbbf406a3eb9"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ],
+ "text/html": [
+ "\n",
+ " <link rel=\"stylesheet\"
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\"
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
crossorigin=\"anonymous\">\n",
+ " <div
id=\"progress_indicator_fa6997b180fa86966dd888a7d59a34f7\">\n",
+ " <div class=\"spinner-border text-info\"
role=\"status\"></div>\n",
+ " <span class=\"text-info\">Processing...
show_graph</span>\n",
+ " </div>\n",
+ " "
+ ]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ],
+ "text/html": [
+ "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
+ "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
+ " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
+ "<!-- Generated by graphviz version 2.43.0 (0)\n",
+ " -->\n",
+ "<!-- Title: G Pages: 1 -->\n",
+ "<svg width=\"481pt\" height=\"592pt\"\n",
+ " viewBox=\"0.00 0.00 480.50 592.48\"
xmlns=\"http://www.w3.org/2000/svg\"
xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+ "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1)
rotate(0) translate(4 588.48)\">\n",
+ "<title>G</title>\n",
+ "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4
-4,-588.48 476.5,-588.48 476.5,4 -4,4\"/>\n",
+ "<!-- [10]: read -->\n",
+ "<g id=\"node1\" class=\"node\">\n",
+ "<title>[10]: read</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"268,-584.48
198,-584.48 198,-548.48 268,-548.48 268,-584.48\"/>\n",
+ "<text text-anchor=\"middle\" x=\"233\" y=\"-562.78\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]:
read</text>\n",
+ "</g>\n",
+ "<!-- lines -->\n",
+ "<g id=\"node2\" class=\"node\">\n",
+ "<title>lines</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"233\"
cy=\"-485.19\" rx=\"27.1\" ry=\"27.1\"/>\n",
+ "<text text-anchor=\"middle\" x=\"233\" y=\"-481.49\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">lines</text>\n",
+ "</g>\n",
+ "<!-- [10]: read->lines -->\n",
+ "<g id=\"edge1\" class=\"edge\">\n",
+ "<title>[10]: read->lines</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M233,-548.28C233,-540.81 233,-531.77 233,-522.91\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"236.5,-522.72
233,-512.72 229.5,-522.72 236.5,-522.72\"/>\n",
+ "</g>\n",
+ "<!-- [13]: Analyze -->\n",
+ "<g id=\"node3\" class=\"node\">\n",
+ "<title>[13]: Analyze</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"278.5,-421.89
187.5,-421.89 187.5,-385.89 278.5,-385.89 278.5,-421.89\"/>\n",
+ "<text text-anchor=\"middle\" x=\"233\" y=\"-400.19\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[13]:
Analyze</text>\n",
+ "</g>\n",
+ "<!-- lines->[13]: Analyze -->\n",
+ "<g id=\"edge2\" class=\"edge\">\n",
+ "<title>lines->[13]: Analyze</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M233,-457.78C233,-449.64 233,-440.65 233,-432.43\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"236.5,-432.2
233,-422.2 229.5,-432.2 236.5,-432.2\"/>\n",
+ "</g>\n",
+ "<!-- nlp_annotations -->\n",
+ "<g id=\"node4\" class=\"node\">\n",
+ "<title>nlp_annotations</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"233\"
cy=\"-282.94\" rx=\"66.89\" ry=\"66.89\"/>\n",
+ "<text text-anchor=\"middle\" x=\"233\" y=\"-279.24\"
font-family=\"Times,serif\" font-size=\"14.00\"
fill=\"blue\">nlp_annotations</text>\n",
+ "</g>\n",
+ "<!-- [13]: Analyze->nlp_annotations -->\n",
+ "<g id=\"edge3\" class=\"edge\">\n",
+ "<title>[13]: Analyze->nlp_annotations</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M233,-385.85C233,-378.65 233,-369.72 233,-360.16\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"236.5,-360.02
233,-350.02 229.5,-360.02 236.5,-360.02\"/>\n",
+ "</g>\n",
+ "<!-- [14]: Break -->\n",
+ "<g id=\"node5\" class=\"node\">\n",
+ "<title>[14]: Break</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"136.5,-180
57.5,-180 57.5,-144 136.5,-144 136.5,-180\"/>\n",
+ "<text text-anchor=\"middle\" x=\"97\" y=\"-158.3\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[14]:
Break</text>\n",
+ "</g>\n",
+ "<!-- nlp_annotations->[14]: Break -->\n",
+ "<g id=\"edge4\" class=\"edge\">\n",
+ "<title>nlp_annotations->[14]: Break</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M182.75,-237.99C162.98,-220.7 141.06,-201.53 124.33,-186.9\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"126.4,-184.06
116.57,-180.11 121.79,-189.33 126.4,-184.06\"/>\n",
+ "</g>\n",
+ "<!-- [15]: GetRelationships -->\n",
+ "<g id=\"node8\" class=\"node\">\n",
+ "<title>[15]: GetRelationships</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"303,-180
163,-180 163,-144 303,-144 303,-180\"/>\n",
+ "<text text-anchor=\"middle\" x=\"233\" y=\"-158.3\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[15]:
GetRelationships</text>\n",
+ "</g>\n",
+ "<!-- nlp_annotations->[15]: GetRelationships -->\n",
+ "<g id=\"edge5\" class=\"edge\">\n",
+ "<title>nlp_annotations->[15]:
GetRelationships</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M233,-215.86C233,-206.85 233,-198.04 233,-190.28\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"236.5,-190.09
233,-180.09 229.5,-190.09 236.5,-190.09\"/>\n",
+ "</g>\n",
+ "<!-- [16]: GetEntityMentions -->\n",
+ "<g id=\"node11\" class=\"node\">\n",
+ "<title>[16]: GetEntityMentions</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"472.5,-180
321.5,-180 321.5,-144 472.5,-144 472.5,-180\"/>\n",
+ "<text text-anchor=\"middle\" x=\"397\" y=\"-158.3\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[16]:
GetEntityMentions</text>\n",
+ "</g>\n",
+ "<!-- nlp_annotations->[16]: GetEntityMentions -->\n",
+ "<g id=\"edge6\" class=\"edge\">\n",
+ "<title>nlp_annotations->[16]:
GetEntityMentions</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M287.15,-242.67C312.85,-224.03 342.69,-202.39 364.78,-186.37\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"367.1,-189.01
373.14,-180.31 362.99,-183.35 367.1,-189.01\"/>\n",
+ "</g>\n",
+ "<!-- pcoll3490 -->\n",
+ "<g id=\"node6\" class=\"node\">\n",
+ "<title>pcoll3490</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"81\" cy=\"-90\"
rx=\"18\" ry=\"18\"/>\n",
+ "</g>\n",
+ "<!-- [14]: Break->pcoll3490 -->\n",
+ "<g id=\"edge7\" class=\"edge\">\n",
+ "<title>[14]: Break->pcoll3490</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M93.04,-143.7C91.24,-135.78 89.05,-126.23 87.04,-117.44\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"90.44,-116.59
84.8,-107.62 83.62,-118.15 90.44,-116.59\"/>\n",
+ "</g>\n",
+ "<!-- [14]: WriteToBigQuery -->\n",
+ "<g id=\"node7\" class=\"node\">\n",
+ "<title>[14]: WriteToBigQuery</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"144,-36 0,-36
0,0 144,0 144,-36\"/>\n",
+ "<text text-anchor=\"middle\" x=\"72\" y=\"-14.3\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[14]:
WriteToBigQuery</text>\n",
+ "</g>\n",
+ "<!-- pcoll3490->[14]: WriteToBigQuery -->\n",
+ "<g id=\"edge8\" class=\"edge\">\n",
+ "<title>pcoll3490->[14]: WriteToBigQuery</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M78.82,-72.05C77.83,-64.35 76.63,-55.03 75.52,-46.36\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"78.97,-45.75
74.22,-36.28 72.02,-46.64 78.97,-45.75\"/>\n",
+ "</g>\n",
+ "<!-- pcoll628 -->\n",
+ "<g id=\"node9\" class=\"node\">\n",
+ "<title>pcoll628</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"233\" cy=\"-90\"
rx=\"18\" ry=\"18\"/>\n",
+ "</g>\n",
+ "<!-- [15]: GetRelationships->pcoll628 -->\n",
+ "<g id=\"edge9\" class=\"edge\">\n",
+ "<title>[15]: GetRelationships->pcoll628</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M233,-143.7C233,-135.98 233,-126.71 233,-118.11\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"236.5,-118.1
233,-108.1 229.5,-118.1 236.5,-118.1\"/>\n",
+ "</g>\n",
+ "<!-- [15]: WriteToBigQuery -->\n",
+ "<g id=\"node10\" class=\"node\">\n",
+ "<title>[15]: WriteToBigQuery</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"306,-36 162,-36
162,0 306,0 306,-36\"/>\n",
+ "<text text-anchor=\"middle\" x=\"234\" y=\"-14.3\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[15]:
WriteToBigQuery</text>\n",
+ "</g>\n",
+ "<!-- pcoll628->[15]: WriteToBigQuery -->\n",
+ "<g id=\"edge10\" class=\"edge\">\n",
+ "<title>pcoll628->[15]: WriteToBigQuery</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M233.25,-71.7C233.36,-63.98 233.49,-54.71 233.61,-46.11\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"237.11,-46.15
233.76,-36.1 230.11,-46.05 237.11,-46.15\"/>\n",
+ "</g>\n",
+ "<!-- pcoll9933 -->\n",
+ "<g id=\"node12\" class=\"node\">\n",
+ "<title>pcoll9933</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"397\" cy=\"-90\"
rx=\"18\" ry=\"18\"/>\n",
+ "</g>\n",
+ "<!-- [16]: GetEntityMentions->pcoll9933 -->\n",
+ "<g id=\"edge11\" class=\"edge\">\n",
+ "<title>[16]: GetEntityMentions->pcoll9933</title>\n",
+ "<path fill=\"none\" stroke=\"black\"
d=\"M397,-143.7C397,-135.98 397,-126.71 397,-118.11\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"400.5,-118.1
397,-108.1 393.5,-118.1 400.5,-118.1\"/>\n",
+ "</g>\n",
+ "<!-- [16]: WriteToBigQuery -->\n",
+ "<g id=\"node13\" class=\"node\">\n",
+ "<title>[16]: WriteToBigQuery</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"469,-36 325,-36
325,0 469,0 469,-36\"/>\n",
+ "<text text-anchor=\"middle\" x=\"397\" y=\"-14.3\"
font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[16]:
WriteToBigQuery</text>\n",
+ "</g>\n",
+ "<!-- pcoll9933->[16]: WriteToBigQuery -->\n",
+ "<g id=\"edge12\" class=\"edge\">\n",
+ "<title>pcoll9933->[16]: WriteToBigQuery</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M397,-71.7C397,-63.98
397,-54.71 397,-46.11\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"400.5,-46.1
397,-36.1 393.5,-46.1 400.5,-46.1\"/>\n",
+ "</g>\n",
+ "</g>\n",
+ "</svg>\n"
+ ]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "application/javascript": [
+ "\n",
+ " if (typeof window.interactive_beam_jquery ==
'undefined') {\n",
+ " var jqueryScript =
document.createElement('script');\n",
+ " jqueryScript.src =
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
+ " jqueryScript.type = 'text/javascript';\n",
+ " jqueryScript.onload = function() {\n",
+ " var datatableScript =
document.createElement('script');\n",
+ " datatableScript.src =
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
+ " datatableScript.type = 'text/javascript';\n",
+ " datatableScript.onload = function() {\n",
+ " window.interactive_beam_jquery =
jQuery.noConflict(true);\n",
+ "
window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ "
$(\"#progress_indicator_fa6997b180fa86966dd888a7d59a34f7\").remove();\n",
+ " });\n",
+ " }\n",
+ " document.head.appendChild(datatableScript);\n",
+ " };\n",
+ " document.head.appendChild(jqueryScript);\n",
+ " } else {\n",
+ "
window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ "
$(\"#progress_indicator_fa6997b180fa86966dd888a7d59a34f7\").remove();\n",
+ " });\n",
+ " }"
+ ]
+ },
+ "metadata": {}
+ }
+ ]
+ }
+ ]
+}