KevinGG commented on a change in pull request #11469:
URL: https://github.com/apache/beam/pull/11469#discussion_r412455361
##########
File path:
sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Get data from covidtracking.com\n",
+ "The data set is relatively small and used as a demonstration of working
with Beam in an interactive notebook environment.\n",
+ "\n",
+ "There are two ways to get the data:\n",
+ "\n",
+ "- Get json data from APIs.\n",
+ "- Download data in csv files directly.\n",
+ "\n",
+ "We'll have a batch Beam pipeline example utilizing either method."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import json\n",
+ "import requests\n",
+ "\n",
+ "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+ "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+ "\n",
+ "def get_json_data(url):\n",
+ " with requests.Session() as session:\n",
+ " data = json.loads(session.get(url).text)\n",
+ " return data\n",
+ "\n",
+ "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+ "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+ "\n",
+ "def download_csv(url, filename):\n",
+ " if not filename.endswith('.csv'):\n",
+ " filename = filename + '.csv'\n",
+ " with requests.Session() as session:\n",
+ " with open(filename, 'wb') as f:\n",
+ " f.write(session.get(url).content)\n",
+ " return filename"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Below reads data into memory as json."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "current_data = get_json_data(json_current)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Below downloads data in csv format stored in files."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "csv_file_current = download_csv(csv_current, 'current')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Prepare some Apache Beam dependencies."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import apache_beam as beam\n",
+ "from apache_beam.runners.interactive import interactive_beam as ib\n",
+ "from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a Beam pipeline."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "p = beam.Pipeline(runner=InteractiveRunner())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can create a PCollection from either in-memory json data or data in
files."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "current_data_from_json = p | 'Create PCollection from json' >>
beam.Create(current_data)\n",
+ "current_data_from_files = p | 'Create PCollection from files' >>
beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The in-memory json data is already structured."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ib.show(current_data_from_json)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The data from files read as plain text is not structured, we'll have to
handle it.\n",
+ "\n",
+ "For a batch pipeline reading files with huge content size, it's normal to
read source data from files and let Beam handle the work load."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ib.show(current_data_from_files)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We'll parse the plain texts into structured data with Beam SDK."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "current_data_headers =
'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"
Review comment:
Yes, added a `read_headers` function to read headers directly from csv
files.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]