Repository: incubator-systemml
Updated Branches:
  refs/heads/master e1f713aae -> 0daae6cf0


[SYSTEMML-1185] Updating Preprocessing Notebook

Updates to the Preprocessing notebook for Spark 2.x and general cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0daae6cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0daae6cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0daae6cf

Branch: refs/heads/master
Commit: 0daae6cf05961d693e1797c21333904b24d45e2f
Parents: e1f713a
Author: Mike Dusenberry <[email protected]>
Authored: Thu Feb 23 14:52:51 2017 -0800
Committer: Mike Dusenberry <[email protected]>
Committed: Thu Feb 23 14:52:51 2017 -0800

----------------------------------------------------------------------
 projects/breast_cancer/Preprocessing.ipynb | 424 +++++++++++++++---------
 1 file changed, 270 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0daae6cf/projects/breast_cancer/Preprocessing.ipynb
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/Preprocessing.ipynb 
b/projects/breast_cancer/Preprocessing.ipynb
index ed647b9..3db7560 100644
--- a/projects/breast_cancer/Preprocessing.ipynb
+++ b/projects/breast_cancer/Preprocessing.ipynb
@@ -2,7 +2,10 @@
  "cells": [
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Predicting Breast Cancer Proliferation Scores with Apache Spark and 
Apache SystemML\n",
     "## Preprocessing\n",
@@ -11,7 +14,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Setup"
    ]
@@ -20,7 +26,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -35,7 +43,7 @@
     "import openslide\n",
     "from openslide.deepzoom import DeepZoomGenerator\n",
     "import pandas as pd\n",
-    "from pyspark.mllib.linalg import Vectors\n",
+    "from pyspark.ml.linalg import Vectors\n",
     "from scipy.ndimage.morphology import binary_fill_holes\n",
     "from skimage.color import rgb2gray\n",
     "from skimage.feature import canny\n",
@@ -46,7 +54,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Open Whole-Slide Image"
    ]
@@ -55,7 +66,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -75,17 +88,22 @@
     "    An OpenSlide object representing a whole-slide image.\n",
     "  \"\"\"\n",
     "  if training:\n",
-    "    filename = os.path.join(folder, \"training_image_data\", 
\"TUPAC-TR-{}.svs\".format(str(slide_num).zfill(3)))\n",
+    "    filename = os.path.join(folder, \"training_image_data\",\n",
+    "                            
\"TUPAC-TR-{}.svs\".format(str(slide_num).zfill(3)))\n",
     "  else:\n",
     "    # Testing images\n",
-    "    filename = os.path.join(folder, \"testing_image_data\", 
\"TUPAC-TE-{}.svs\".format(str(slide_num).zfill(3)))\n",
+    "    filename = os.path.join(folder, \"testing_image_data\",\n",
+    "                            
\"TUPAC-TE-{}.svs\".format(str(slide_num).zfill(3)))\n",
     "  slide = openslide.open_slide(filename)\n",
     "  return slide"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Create Tile Generator"
    ]
@@ -94,7 +112,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -122,7 +142,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Determine 20x Magnification Zoom Level"
    ]
@@ -131,7 +154,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -170,7 +195,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Generate Tile Indices For Whole-Slide Image."
    ]
@@ -179,7 +207,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -220,7 +250,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Generate Tile From Tile Index"
    ]
@@ -229,7 +262,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -259,14 +294,17 @@
     "  slide = open_slide(slide_num, folder, training)\n",
     "  # Create tile generator.\n",
     "  generator = create_tile_generator(slide, tile_size, overlap)\n",
-    "  # Generate tile\n",
+    "  # Generate tile.\n",
     "  tile = np.array(generator.get_tile(zoom_level, (col, row)))\n",
     "  return (slide_num, tile)"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Filter Tile For Dimensions & Tissue Threshold"
    ]
@@ -275,7 +313,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -332,7 +372,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Generate Flattened Samples From Tile"
    ]
@@ -341,7 +384,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -378,7 +423,8 @@
     "  x, y, ch = tile.shape\n",
     "  # 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, 
sample_size_y, ch), where\n",
     "  # num_x and num_y are the number of chopped tiles on the x and y axes, 
respectively.\n",
-    "  # 2. Swap sample_size_x and num_y axes to create (num_x, num_y, 
sample_size_x, sample_size_y, ch).\n",
+    "  # 2. Swap sample_size_x and num_y axes to create\n",
+    "  # (num_x, num_y, sample_size_x, sample_size_y, ch).\n",
     "  # 3. Combine num_x and num_y into single axis, returning\n",
     "  # (num_samples, sample_size_x, sample_size_y, ch).\n",
     "  # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) 
to\n",
@@ -395,7 +441,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Visualize Tile"
    ]
@@ -404,7 +453,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -424,7 +475,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Visualize Sample"
    ]
@@ -433,7 +487,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -464,7 +520,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Get Ground Truth Labels"
    ]
@@ -473,7 +532,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -489,7 +550,7 @@
     "  labels = pd.read_csv(filename, 
names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
     "  labels[\"slide_num\"] = range(1, 501)\n",
     "\n",
-    "  # Create slide_num -> tumor_score, and slide_num -> molecular_score 
dictionaries\n",
+    "  # Create slide_num -> tumor_score, and slide_num -> molecular_score 
dictionaries.\n",
     "  tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, 
labels.tumor_score)}\n",
     "  molecular_score_dict = {int(s): float(l) for s,l in 
zip(labels.slide_num, labels.molecular_score)}\n",
     "  return tumor_score_dict, molecular_score_dict"
@@ -497,7 +558,10 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Process All Slides Into A Saved Spark DataFrame"
    ]
@@ -506,7 +570,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -523,7 +589,7 @@
     "  \n",
     "  Args:\n",
     "    slide_nums: List of whole-slide numbers to process.\n",
-    "    folder: Directory in which the slides folder is stored, as a 
string.\n",
+    "    folder: Local directory in which the slides folder is stored, as a 
string.\n",
     "      This should contain either a `training_image_data` folder with\n",
     "      images in the format `TUPAC-TR-###.svs`, or a 
`testing_image_data`\n",
     "      folder with images in the format `TUPAC-TE-###.svs`.\n",
@@ -542,65 +608,78 @@
     "    molecular score, and the sample stretched out into a Vector.\n",
     "  \"\"\"\n",
     "  slides = sc.parallelize(slide_nums)\n",
-    "  # Force even partitioning by collecting and parallelizing -- for memory 
issues\n",
+    "  # Force even partitioning by collecting and parallelizing -- for memory 
issues.\n",
+    "  # TODO: Explore computing the ideal paritition sizes based on projected 
number\n",
+    "  #   of tiles after filtering.\n",
     "  ## HACK Note: This was a PySpark bug with a fix in the master branch 
now.\n",
-    "  tile_indices = slides.flatMap(lambda slide: process_slide(slide, 
folder, training, tile_size, overlap)).collect()\n",
-    "  tile_indices = sc.parallelize(tile_indices, num_partitions)\n",
+    "  #tile_indices = slides.flatMap(\n",
+    "  #    lambda slide: process_slide(slide, folder, training, tile_size, 
overlap)).collect()\n",
+    "  #tile_indices = sc.parallelize(tile_indices, num_partitions)\n",
     "  ## END HACK -- update later\n",
+    "  tile_indices = (slides.flatMap(\n",
+    "      lambda slide: process_slide(slide, folder, training, tile_size, 
overlap)))\n",
+    "  tile_indices = tile_indices.repartition(num_partitions)\n",
     "  tiles = tile_indices.map(lambda tile_index: 
process_tile_index(tile_index, folder, training))\n",
     "  filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, 
tissue_threshold))\n",
     "  samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, 
sample_size, grayscale))\n",
     "  if training:\n",
     "    tumor_score_dict, molecular_score_dict = 
create_ground_truth_maps(folder)\n",
-    "    samples_with_labels = (samples.map(lambda tup: \n",
-    "                                       (tup[0], tumor_score_dict[tup[0]], 
molecular_score_dict[tup[0]],\n",
-    "                                        Vectors.dense(tup[1]))))\n",
+    "    samples_with_labels = (samples.map(\n",
+    "        lambda tup: (tup[0], tumor_score_dict[tup[0]],\n",
+    "                     molecular_score_dict[tup[0]], 
Vectors.dense(tup[1]))))\n",
     "    df = samples_with_labels.toDF([\"slide_num\", \"tumor_score\", 
\"molecular_score\", \"sample\"])\n",
-    "    df = df.select(df.slide_num.astype(\"int\"), 
df.tumor_score.astype(\"int\"), df.molecular_score, df[\"sample\"])\n",
+    "    df = df.select(df.slide_num.astype(\"int\"), 
df.tumor_score.astype(\"int\"),\n",
+    "                   df.molecular_score, df[\"sample\"])\n",
     "  else:  # testing data -- no labels\n",
     "    df = samples.toDF([\"slide_num\", \"sample\"])\n",
     "    df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n",
-    "  df = df.repartition(num_partitions)  # Even out the partitions\n",
+    "  #df = df.repartition(num_partitions)  # Even out the partitions\n",
     "  return df\n",
     "\n",
-    "def save(df, training=True, sample_size=256, grayscale=False, 
mode=\"error\"):\n",
+    "def save(df, filename, sample_size=256, grayscale=False, 
folder=\"data\",\n",
+    "         mode=\"error\", format=\"parquet\", file_size=128):\n",
     "  \"\"\"\n",
-    "  Save a preprocessed DataFrame of samples in Parquet format.\n",
-    "  \n",
-    "  The filename will be formatted as follows:\n",
-    "    `samples_{labels|testing}_SAMPLE-SIZE[_grayscale].parquet`\n",
+    "  Save a preprocessed DataFrame with a constraint on the file sizes.\n",
     "  \n",
     "  Args:\n",
-    "    df: A DataFrame in which each row contains the slide number, tumor 
score,\n",
-    "      molecular score, and the sample stretched out into a Vector.\n",
-    "    training: Boolean for training or testing datasets.\n",
+    "    df: A DataFrame.\n",
+    "    filename: Name of the file to save.\n",
     "    sample_size: The width and height of the square samples.\n",
     "    grayscale: Whether or not to the samples are in grayscale format, 
rather\n",
     "      than RGB.\n",
+    "    folder: HDFS directory in which to save the DataFrame.\n",
     "    mode: Specifies the behavior of `df.write.mode` when the data already 
exists.\n",
     "      Options include:\n",
     "        * `append`: Append contents of this :class:`DataFrame` to 
existing data.\n",
     "        * `overwrite`: Overwrite existing data.\n",
     "        * `error`: Throw an exception if data already exists.\n",
     "        * `ignore`: Silently ignore this operation if data already 
exists.\n",
+    "    format: The format in which to save the DataFrame.\n",
+    "    file_size: Size in MB of each saved file.  128 MB is an empirically 
ideal size.\n",
     "  \"\"\"\n",
-    "  filename = \"samples_{}_{}{}.parquet\".format(\"labels\" if training 
else \"testing\",\n",
-    "                                              sample_size,\n",
-    "                                              \"_grayscale\" if grayscale 
else \"\")\n",
-    "  filepath = os.path.join(\"data\", filename)\n",
-    "  df.write.mode(mode).save(filepath, format=\"parquet\")"
+    "  channels = 1 if grayscale else 3\n",
+    "  row_mb = sample_size * sample_size * channels * 8 / 1024 / 1024  # size 
of one row in MB\n",
+    "  rows_per_file = round(file_size / row_mb)\n",
+    "  filepath = os.path.join(folder, filename)\n",
+    "  df.write.option(\"maxRecordsPerFile\", 
rows_per_file).mode(mode).save(filepath, format=format)"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "---"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Execute Preprocessing & Save"
    ]
@@ -609,7 +688,9 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
@@ -623,11 +704,13 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# Get list of image numbers, minus the broken ones\n",
+    "# Get list of image numbers, minus the broken ones.\n",
     "broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}\n",
     "slide_nums = sorted(set(range(1,501)) - broken)\n",
     "\n",
@@ -637,18 +720,24 @@
     "sample_size = 256\n",
     "grayscale = False\n",
     "num_partitions = 20000\n",
-    "folder = \"/home/MDM/breast_cancer/data\""
+    "folder = \"/home/MDM/breast_cancer/data\"\n",
+    "filename = \"samples_{}_{}{}.parquet\".format(\n",
+    "    \"labels\" if training else \"testing\", sample_size, \"_grayscale\" 
if grayscale else \"\")\n",
+    "tr_filename = \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" 
if grayscale else \"\")\n",
+    "val_filename = \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if 
grayscale else \"\")"
    ]
   },
   {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# Process all slides\n",
+    "# Process all slides.\n",
     "df = preprocess(slide_nums, tile_size=tile_size, sample_size=sample_size, 
grayscale=grayscale,\n",
     "                training=training, num_partitions=num_partitions, 
folder=folder)\n",
     "df"
@@ -658,31 +747,32 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# Save DataFrame of samples\n",
-    "save(df, sample_size=sample_size, grayscale=grayscale, training=training)"
+    "# Save DataFrame of samples.\n",
+    "save(df, filename, sample_size, grayscale)"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "---"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Split Into Separate Train & Validation DataFrames Based On Slide Number"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "### TODO: Wrap this in a function with appropriate default arguments"
    ]
@@ -691,68 +781,85 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "filename = \"samples_{}_{}{}.parquet\".format(\"labels\" if training else 
\"testing\",\n",
-    "                                            sample_size,\n",
-    "                                            \"_grayscale\" if grayscale 
else \"\")\n",
+    "# Load full DataFrame from disk.\n",
     "filepath = os.path.join(\"data\", filename)\n",
-    "df = sqlContext.read.load(filepath)"
+    "df = spark.read.load(filepath)"
    ]
   },
   {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true,
+    "scrolled": true
    },
    "outputs": [],
    "source": [
-    "labels = pd.read_csv(\"data/training_ground_truth.csv\", 
names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
+    "# Determine how to split data.\n",
+    "labels = pd.read_csv(\n",
+    "    \"data/training_ground_truth.csv\", 
names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
     "labels[\"slide_num\"] = range(1, 501)\n",
     "\n",
-    "# Create slide_num -> tumor_score and slide_num -> molecular_score 
dictionaries\n",
-    "tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, 
labels.tumor_score)}\n",
-    "molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, 
labels.molecular_score)}"
+    "# # Create slide_num -> tumor_score and slide_num -> molecular_score 
dictionaries\n",
+    "# tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, 
labels.tumor_score)}\n",
+    "# molecular_score_dict = {int(s): float(l) for s,l in 
zip(labels.slide_num, labels.molecular_score)}\n",
+    "\n",
+    "print(labels[\"tumor_score\"].value_counts(sort=False))\n",
+    "print(labels[\"tumor_score\"].value_counts(normalize=True, 
sort=False))\n",
+    "print(labels[labels.slide_num <= 
400][\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
+    "print(labels[labels.slide_num > 
400][\"tumor_score\"].value_counts(normalize=True, sort=False))"
    ]
   },
   {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "print(labels[\"tumor_score\"].value_counts() / 
labels.tumor_score.count())\n",
-    "print(labels[labels.slide_num > 400][\"tumor_score\"].value_counts() / 
labels[labels.slide_num > 400].tumor_score.count())"
+    "# Split data into train and validation sets.\n",
+    "# TODO: Stratified random split.\n",
+    "train = df.where(df.slide_num <= 400)\n",
+    "val = df.where(df.slide_num > 400)\n",
+    "train, val"
    ]
   },
   {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "train = (df.where(df.slide_num <= 400)\n",
-    "           .rdd\n",
-    "           .zipWithIndex()\n",
-    "           .map(lambda r: (r[1] + 1, *r[0]))\n",
-    "           .toDF(['__INDEX', 'slide_num', 'tumor_score', 
'molecular_score', 'sample']))\n",
-    "train = train.select(train[\"__INDEX\"].astype(\"int\"), 
train.slide_num.astype(\"int\"), train.tumor_score.astype(\"int\"),\n",
-    "                     train.molecular_score, train[\"sample\"])\n",
+    "# Add row indices for use with SystemML.\n",
+    "# TODO: Wrap this in a function with appropriate default arguments.\n",
+    "train = (train.rdd\n",
+    "              .zipWithIndex()\n",
+    "              .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert 
index to 1-based indexing\n",
+    "              .toDF(['__INDEX', 'slide_num', 'tumor_score', 
'molecular_score', 'sample']))\n",
+    "train = train.select(train[\"__INDEX\"].astype(\"int\"), 
train.slide_num.astype(\"int\"), \n",
+    "                     train.tumor_score.astype(\"int\"), 
train.molecular_score, train[\"sample\"])\n",
     "\n",
-    "val = (df.where(df.slide_num > 400)\n",
-    "         .rdd\n",
-    "         .zipWithIndex()\n",
-    "         .map(lambda r: (r[1] + 1, *r[0]))\n",
-    "         .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 
'sample']))\n",
-    "val = val.select(val[\"__INDEX\"].astype(\"int\"), 
val.slide_num.astype(\"int\"), val.tumor_score.astype(\"int\"),\n",
-    "                 val.molecular_score, val[\"sample\"])\n",
+    "val = (val.rdd\n",
+    "          .zipWithIndex()\n",
+    "          .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 
1-based indexing\n",
+    "          .toDF(['__INDEX', 'slide_num', 'tumor_score', 
'molecular_score', 'sample']))\n",
+    "val = val.select(val[\"__INDEX\"].astype(\"int\"), 
val.slide_num.astype(\"int\"),\n",
+    "                 val.tumor_score.astype(\"int\"), val.molecular_score, 
val[\"sample\"])\n",
     "\n",
     "train, val"
    ]
@@ -761,23 +868,23 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# Write\n",
-    "# TODO: Wrap this in a function with appropriate default arguments\n",
-    "mode = \"error\"\n",
-    "tr_filename = os.path.join(\"data\", 
\"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else 
\"\"))\n",
-    "val_filename = os.path.join(\"data\", 
\"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else 
\"\"))\n",
-    "train.write.mode(mode).save(tr_filename, format=\"parquet\")\n",
-    "val.write.mode(mode).save(val_filename, format=\"parquet\")"
+    "# Save train and validation DataFrames.\n",
+    "save(train, tr_filename, sample_size, grayscale)\n",
+    "save(val, val_filename, sample_size, grayscale)"
    ]
   },
   {
    "cell_type": "markdown",
    "metadata": {
-    "collapsed": true
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
    },
    "source": [
     "---"
@@ -785,14 +892,20 @@
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "# Sample Data"
    ]
   },
   {
    "cell_type": "markdown",
-   "metadata": {},
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
    "source": [
     "### TODO: Wrap this in a function with appropriate default arguments"
    ]
@@ -801,25 +914,28 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "tr_filename = os.path.join(\"data\", 
\"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else 
\"\"))\n",
-    "val_filename = os.path.join(\"data\", 
\"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else 
\"\"))\n",
-    "train = sqlContext.read.load(tr_filename)\n",
-    "val = sqlContext.read.load(val_filename)"
+    "# Load train and validation DataFrames from disk.\n",
+    "train = spark.read.load(tr_filename)\n",
+    "val = spark.read.load(val_filename)"
    ]
   },
   {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# Take a stratified sample\n",
+    "# Take a stratified sample.\n",
     "p=0.01\n",
     "train_sample = train.drop(\"__INDEX\").sampleBy(\"tumor_score\", 
fractions={1: p, 2: p, 3: p}, seed=42)\n",
     "val_sample = val.drop(\"__INDEX\").sampleBy(\"tumor_score\", 
fractions={1: p, 2: p, 3: p}, seed=42)\n",
@@ -830,62 +946,62 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# TODO: turn this into a function\n",
-    "# Repartition to get ~128MB partitions\n",
-    "\n",
-    "# TODO: Update logic to use the following to automatically\n",
-    "# select the number of partitions:\n",
-    "# ex_mb = SIZE*SIZE*CHANNELS * 8 / 1024 / 1024  # size of one example in 
MB\n",
-    "# ideal_part_size_mb = 128  # 128 MB partitions sizes are empirically 
ideal\n",
-    "# ideal_exs_per_part = round(ideal_part_size_mb / ex_mb)\n",
-    "# tr_parts = round(tc / ideal_exs_per_part)\n",
-    "# val_parts = round(vc / ideal_exs_per_part)\n",
-    "\n",
-    "if grayscale:\n",
-    "  train_sample = train_sample.repartition(150)  #300) #3000)\n",
-    "  val_sample = val_sample.repartition(40)  #80) #800)\n",
-    "else:  # 3x\n",
-    "  train_sample = train_sample.repartition(450)  #900) #9000)\n",
-    "  val_sample = val_sample.repartition(120)  #240) #2400)\n",
-    "\n",
-    "# Reassign row indices\n",
+    "# Reassign row indices.\n",
+    "# TODO: Wrap this in a function with appropriate default arguments.\n",
     "train_sample = (\n",
     "  train_sample.rdd\n",
     "              .zipWithIndex()\n",
     "              .map(lambda r: (r[1] + 1, *r[0]))\n",
     "              .toDF(['__INDEX', 'slide_num', 'tumor_score', 
'molecular_score', 'sample']))\n",
-    "train_sample = 
train_sample.select(train_sample[\"__INDEX\"].astype(\"int\"), 
train_sample.slide_num.astype(\"int\"), \n",
-    "                                   
train_sample.tumor_score.astype(\"int\"), train_sample.molecular_score, 
train_sample[\"sample\"])\n",
+    "train_sample = 
train_sample.select(train_sample[\"__INDEX\"].astype(\"int\"),\n",
+    "                                   
train_sample.slide_num.astype(\"int\"), \n",
+    "                                   
train_sample.tumor_score.astype(\"int\"),\n",
+    "                                   train_sample.molecular_score,\n",
+    "                                   train_sample[\"sample\"])\n",
     "\n",
     "val_sample = (\n",
     "  val_sample.rdd\n",
     "            .zipWithIndex()\n",
     "            .map(lambda r: (r[1] + 1, *r[0]))\n",
     "            .toDF(['__INDEX', 'slide_num', 'tumor_score', 
'molecular_score', 'sample']))\n",
-    "val_sample = val_sample.select(val_sample[\"__INDEX\"].astype(\"int\"), 
val_sample.slide_num.astype(\"int\"), \n",
-    "                               val_sample.tumor_score.astype(\"int\"), 
val_sample.molecular_score, val_sample[\"sample\"])\n",
-    "\n",
-    "train_sample, val_sample\n",
+    "val_sample = 
val_sample.select(val_sample[\"__INDEX\"].astype(\"int\"),\n",
+    "                               val_sample.slide_num.astype(\"int\"), \n",
+    "                               val_sample.tumor_score.astype(\"int\"),\n",
+    "                               val_sample.molecular_score,\n",
+    "                               val_sample[\"sample\"])\n",
     "\n",
-    "# Write\n",
-    "# TODO: Wrap this in a function with appropriate default arguments\n",
-    "mode = \"error\"\n",
-    "tr_sample_filename = os.path.join(\"data\", 
\"train_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if 
grayscale else \"\"))\n",
-    "val_sample_filename = os.path.join(\"data\", 
\"val_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if 
grayscale else \"\"))\n",
-    "train_sample.write.mode(mode).save(tr_sample_filename, 
format=\"parquet\")\n",
-    "val_sample.write.mode(mode).save(val_sample_filename, format=\"parquet\")"
+    "train_sample, val_sample"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
+   },
+   "outputs": [],
+   "source": [
+    "# Save train and validation DataFrames.\n",
+    "tr_sample_filename = \"train_{}_sample_{}{}.parquet\".format(p, 
sample_size, \"_grayscale\" if grayscale else \"\")\n",
+    "val_sample_filename = \"val_{}_sample_{}{}.parquet\".format(p, 
sample_size, \"_grayscale\" if grayscale else \"\")\n",
+    "save(train_sample, tr_sample_filename, sample_size, grayscale)\n",
+    "save(val_sample, val_sample_filename, sample_size, grayscale)"
    ]
   }
  ],
  "metadata": {
   "kernelspec": {
-   "display_name": "Python 3",
+   "display_name": "Python 3 + Spark 2.x + SystemML",
    "language": "python",
-   "name": "python3"
+   "name": "pyspark3_2.x"
   },
   "language_info": {
    "codemirror_mode": {

Reply via email to