Repository: incubator-systemml Updated Branches: refs/heads/master e1f713aae -> 0daae6cf0
[SYSTEMML-1185] Updating Preprocessing Notebook Updates to the Preprocessing notebook for Spark 2.x and general cleanup. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0daae6cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0daae6cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0daae6cf Branch: refs/heads/master Commit: 0daae6cf05961d693e1797c21333904b24d45e2f Parents: e1f713a Author: Mike Dusenberry <[email protected]> Authored: Thu Feb 23 14:52:51 2017 -0800 Committer: Mike Dusenberry <[email protected]> Committed: Thu Feb 23 14:52:51 2017 -0800 ---------------------------------------------------------------------- projects/breast_cancer/Preprocessing.ipynb | 424 +++++++++++++++--------- 1 file changed, 270 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0daae6cf/projects/breast_cancer/Preprocessing.ipynb ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/Preprocessing.ipynb b/projects/breast_cancer/Preprocessing.ipynb index ed647b9..3db7560 100644 --- a/projects/breast_cancer/Preprocessing.ipynb +++ b/projects/breast_cancer/Preprocessing.ipynb @@ -2,7 +2,10 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML\n", "## Preprocessing\n", @@ -11,7 +14,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Setup" ] @@ -20,7 +26,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -35,7 +43,7 @@ "import openslide\n", "from openslide.deepzoom import DeepZoomGenerator\n", "import pandas as pd\n", - "from pyspark.mllib.linalg import Vectors\n", + "from pyspark.ml.linalg import Vectors\n", "from scipy.ndimage.morphology import binary_fill_holes\n", "from skimage.color import rgb2gray\n", "from skimage.feature import canny\n", @@ -46,7 +54,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Open Whole-Slide Image" ] @@ -55,7 +66,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -75,17 +88,22 @@ " An OpenSlide object representing a whole-slide image.\n", " \"\"\"\n", " if training:\n", - " filename = os.path.join(folder, \"training_image_data\", \"TUPAC-TR-{}.svs\".format(str(slide_num).zfill(3)))\n", + " filename = os.path.join(folder, \"training_image_data\",\n", + " \"TUPAC-TR-{}.svs\".format(str(slide_num).zfill(3)))\n", " else:\n", " # Testing images\n", - " filename = os.path.join(folder, \"testing_image_data\", \"TUPAC-TE-{}.svs\".format(str(slide_num).zfill(3)))\n", + " filename = os.path.join(folder, \"testing_image_data\",\n", + " \"TUPAC-TE-{}.svs\".format(str(slide_num).zfill(3)))\n", " slide = openslide.open_slide(filename)\n", " return slide" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Create Tile Generator" ] @@ -94,7 +112,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -122,7 +142,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Determine 20x Magnification Zoom Level" ] @@ -131,7 +154,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -170,7 +195,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Generate Tile Indices For Whole-Slide Image." ] @@ -179,7 +207,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -220,7 +250,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Generate Tile From Tile Index" ] @@ -229,7 +262,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -259,14 +294,17 @@ " slide = open_slide(slide_num, folder, training)\n", " # Create tile generator.\n", " generator = create_tile_generator(slide, tile_size, overlap)\n", - " # Generate tile\n", + " # Generate tile.\n", " tile = np.array(generator.get_tile(zoom_level, (col, row)))\n", " return (slide_num, tile)" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Filter Tile For Dimensions & Tissue Threshold" ] @@ -275,7 +313,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -332,7 +372,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Generate Flattened Samples From Tile" ] @@ -341,7 +384,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -378,7 +423,8 @@ " x, y, ch = tile.shape\n", " # 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where\n", " # num_x and num_y are the number of chopped tiles on the x and y axes, respectively.\n", - " # 2. Swap sample_size_x and num_y axes to create (num_x, num_y, sample_size_x, sample_size_y, ch).\n", + " # 2. Swap sample_size_x and num_y axes to create\n", + " # (num_x, num_y, sample_size_x, sample_size_y, ch).\n", " # 3. Combine num_x and num_y into single axis, returning\n", " # (num_samples, sample_size_x, sample_size_y, ch).\n", " # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to\n", @@ -395,7 +441,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Visualize Tile" ] @@ -404,7 +453,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -424,7 +475,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Visualize Sample" ] @@ -433,7 +487,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -464,7 +520,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Get Ground Truth Labels" ] @@ -473,7 +532,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -489,7 +550,7 @@ " labels = pd.read_csv(filename, names=[\"tumor_score\",\"molecular_score\"], header=None)\n", " labels[\"slide_num\"] = range(1, 501)\n", "\n", - " # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries\n", + " # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries.\n", " tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n", " molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}\n", " return tumor_score_dict, molecular_score_dict" @@ -497,7 +558,10 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Process All Slides Into A Saved Spark DataFrame" ] @@ -506,7 +570,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -523,7 +589,7 @@ " \n", " Args:\n", " slide_nums: List of whole-slide numbers to process.\n", - " folder: Directory in which the slides folder is stored, as a string.\n", + " folder: Local directory in which the slides folder is stored, as a string.\n", " This should contain either a `training_image_data` folder with\n", " images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n", " folder with images in the format `TUPAC-TE-###.svs`.\n", @@ -542,65 +608,78 @@ " molecular score, and the sample stretched out into a Vector.\n", " \"\"\"\n", " slides = sc.parallelize(slide_nums)\n", - " # Force even partitioning by collecting and parallelizing -- for memory issues\n", + " # Force even partitioning by collecting and parallelizing -- for memory issues.\n", + " # TODO: Explore computing the ideal paritition sizes based on projected number\n", + " # of tiles after filtering.\n", " ## HACK Note: This was a PySpark bug with a fix in the master branch now.\n", - " tile_indices = slides.flatMap(lambda slide: process_slide(slide, folder, training, tile_size, overlap)).collect()\n", - " tile_indices = sc.parallelize(tile_indices, num_partitions)\n", + " #tile_indices = slides.flatMap(\n", + " # lambda slide: process_slide(slide, folder, training, tile_size, overlap)).collect()\n", + " #tile_indices = sc.parallelize(tile_indices, num_partitions)\n", " ## END HACK -- update later\n", + " tile_indices = (slides.flatMap(\n", + " lambda slide: process_slide(slide, folder, training, tile_size, overlap)))\n", + " tile_indices = tile_indices.repartition(num_partitions)\n", " tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))\n", " filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n", " samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n", " if training:\n", " tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)\n", - " samples_with_labels = (samples.map(lambda tup: \n", - " (tup[0], tumor_score_dict[tup[0]], molecular_score_dict[tup[0]],\n", - " Vectors.dense(tup[1]))))\n", + " samples_with_labels = (samples.map(\n", + " lambda tup: (tup[0], tumor_score_dict[tup[0]],\n", + " molecular_score_dict[tup[0]], Vectors.dense(tup[1]))))\n", " df = samples_with_labels.toDF([\"slide_num\", \"tumor_score\", \"molecular_score\", \"sample\"])\n", - " df = df.select(df.slide_num.astype(\"int\"), df.tumor_score.astype(\"int\"), df.molecular_score, df[\"sample\"])\n", + " df = df.select(df.slide_num.astype(\"int\"), df.tumor_score.astype(\"int\"),\n", + " df.molecular_score, df[\"sample\"])\n", " else: # testing data -- no labels\n", " df = samples.toDF([\"slide_num\", \"sample\"])\n", " df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n", - " df = df.repartition(num_partitions) # Even out the partitions\n", + " #df = df.repartition(num_partitions) # Even out the partitions\n", " return df\n", "\n", - "def save(df, training=True, sample_size=256, grayscale=False, mode=\"error\"):\n", + "def save(df, filename, sample_size=256, grayscale=False, folder=\"data\",\n", + " mode=\"error\", format=\"parquet\", file_size=128):\n", " \"\"\"\n", - " Save a preprocessed DataFrame of samples in Parquet format.\n", - " \n", - " The filename will be formatted as follows:\n", - " `samples_{labels|testing}_SAMPLE-SIZE[_grayscale].parquet`\n", + " Save a preprocessed DataFrame with a constraint on the file sizes.\n", " \n", " Args:\n", - " df: A DataFrame in which each row contains the slide number, tumor score,\n", - " molecular score, and the sample stretched out into a Vector.\n", - " training: Boolean for training or testing datasets.\n", + " df: A DataFrame.\n", + " filename: Name of the file to save.\n", " sample_size: The width and height of the square samples.\n", " grayscale: Whether or not to the samples are in grayscale format, rather\n", " than RGB.\n", + " folder: HDFS directory in which to save the DataFrame.\n", " mode: Specifies the behavior of `df.write.mode` when the data already exists.\n", " Options include:\n", " * `append`: Append contents of this :class:`DataFrame` to existing data.\n", " * `overwrite`: Overwrite existing data.\n", " * `error`: Throw an exception if data already exists.\n", " * `ignore`: Silently ignore this operation if data already exists.\n", + " format: The format in which to save the DataFrame.\n", + " file_size: Size in MB of each saved file. 128 MB is an empirically ideal size.\n", " \"\"\"\n", - " filename = \"samples_{}_{}{}.parquet\".format(\"labels\" if training else \"testing\",\n", - " sample_size,\n", - " \"_grayscale\" if grayscale else \"\")\n", - " filepath = os.path.join(\"data\", filename)\n", - " df.write.mode(mode).save(filepath, format=\"parquet\")" + " channels = 1 if grayscale else 3\n", + " row_mb = sample_size * sample_size * channels * 8 / 1024 / 1024 # size of one row in MB\n", + " rows_per_file = round(file_size / row_mb)\n", + " filepath = os.path.join(folder, filename)\n", + " df.write.option(\"maxRecordsPerFile\", rows_per_file).mode(mode).save(filepath, format=format)" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "---" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Execute Preprocessing & Save" ] @@ -609,7 +688,9 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ @@ -623,11 +704,13 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "# Get list of image numbers, minus the broken ones\n", + "# Get list of image numbers, minus the broken ones.\n", "broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}\n", "slide_nums = sorted(set(range(1,501)) - broken)\n", "\n", @@ -637,18 +720,24 @@ "sample_size = 256\n", "grayscale = False\n", "num_partitions = 20000\n", - "folder = \"/home/MDM/breast_cancer/data\"" + "folder = \"/home/MDM/breast_cancer/data\"\n", + "filename = \"samples_{}_{}{}.parquet\".format(\n", + " \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\")\n", + "tr_filename = \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\")\n", + "val_filename = \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "# Process all slides\n", + "# Process all slides.\n", "df = preprocess(slide_nums, tile_size=tile_size, sample_size=sample_size, grayscale=grayscale,\n", " training=training, num_partitions=num_partitions, folder=folder)\n", "df" @@ -658,31 +747,32 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "# Save DataFrame of samples\n", - "save(df, sample_size=sample_size, grayscale=grayscale, training=training)" + "# Save DataFrame of samples.\n", + "save(df, filename, sample_size, grayscale)" ] }, { "cell_type": "markdown", - "metadata": {}, - "source": [ - "---" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Split Into Separate Train & Validation DataFrames Based On Slide Number" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "### TODO: Wrap this in a function with appropriate default arguments" ] @@ -691,68 +781,85 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "filename = \"samples_{}_{}{}.parquet\".format(\"labels\" if training else \"testing\",\n", - " sample_size,\n", - " \"_grayscale\" if grayscale else \"\")\n", + "# Load full DataFrame from disk.\n", "filepath = os.path.join(\"data\", filename)\n", - "df = sqlContext.read.load(filepath)" + "df = spark.read.load(filepath)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true, + "scrolled": true }, "outputs": [], "source": [ - "labels = pd.read_csv(\"data/training_ground_truth.csv\", names=[\"tumor_score\",\"molecular_score\"], header=None)\n", + "# Determine how to split data.\n", + "labels = pd.read_csv(\n", + " \"data/training_ground_truth.csv\", names=[\"tumor_score\",\"molecular_score\"], header=None)\n", "labels[\"slide_num\"] = range(1, 501)\n", "\n", - "# Create slide_num -> tumor_score and slide_num -> molecular_score dictionaries\n", - "tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n", - "molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}" + "# # Create slide_num -> tumor_score and slide_num -> molecular_score dictionaries\n", + "# tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n", + "# molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}\n", + "\n", + "print(labels[\"tumor_score\"].value_counts(sort=False))\n", + "print(labels[\"tumor_score\"].value_counts(normalize=True, sort=False))\n", + "print(labels[labels.slide_num <= 400][\"tumor_score\"].value_counts(normalize=True, sort=False))\n", + "print(labels[labels.slide_num > 400][\"tumor_score\"].value_counts(normalize=True, sort=False))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "print(labels[\"tumor_score\"].value_counts() / labels.tumor_score.count())\n", - "print(labels[labels.slide_num > 400][\"tumor_score\"].value_counts() / labels[labels.slide_num > 400].tumor_score.count())" + "# Split data into train and validation sets.\n", + "# TODO: Stratified random split.\n", + "train = df.where(df.slide_num <= 400)\n", + "val = df.where(df.slide_num > 400)\n", + "train, val" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "train = (df.where(df.slide_num <= 400)\n", - " .rdd\n", - " .zipWithIndex()\n", - " .map(lambda r: (r[1] + 1, *r[0]))\n", - " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", - "train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), train.tumor_score.astype(\"int\"),\n", - " train.molecular_score, train[\"sample\"])\n", + "# Add row indices for use with SystemML.\n", + "# TODO: Wrap this in a function with appropriate default arguments.\n", + "train = (train.rdd\n", + " .zipWithIndex()\n", + " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n", + " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", + "train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n", + " train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n", "\n", - "val = (df.where(df.slide_num > 400)\n", - " .rdd\n", - " .zipWithIndex()\n", - " .map(lambda r: (r[1] + 1, *r[0]))\n", - " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", - "val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"), val.tumor_score.astype(\"int\"),\n", - " val.molecular_score, val[\"sample\"])\n", + "val = (val.rdd\n", + " .zipWithIndex()\n", + " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n", + " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", + "val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n", + " val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n", "\n", "train, val" ] @@ -761,23 +868,23 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "# Write\n", - "# TODO: Wrap this in a function with appropriate default arguments\n", - "mode = \"error\"\n", - "tr_filename = os.path.join(\"data\", \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n", - "val_filename = os.path.join(\"data\", \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n", - "train.write.mode(mode).save(tr_filename, format=\"parquet\")\n", - "val.write.mode(mode).save(val_filename, format=\"parquet\")" + "# Save train and validation DataFrames.\n", + "save(train, tr_filename, sample_size, grayscale)\n", + "save(val, val_filename, sample_size, grayscale)" ] }, { "cell_type": "markdown", "metadata": { - "collapsed": true + "collapsed": true, + "deletable": true, + "editable": true }, "source": [ "---" @@ -785,14 +892,20 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "# Sample Data" ] }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "deletable": true, + "editable": true + }, "source": [ "### TODO: Wrap this in a function with appropriate default arguments" ] @@ -801,25 +914,28 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "tr_filename = os.path.join(\"data\", \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n", - "val_filename = os.path.join(\"data\", \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n", - "train = sqlContext.read.load(tr_filename)\n", - "val = sqlContext.read.load(val_filename)" + "# Load train and validation DataFrames from disk.\n", + "train = spark.read.load(tr_filename)\n", + "val = spark.read.load(val_filename)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "# Take a stratified sample\n", + "# Take a stratified sample.\n", "p=0.01\n", "train_sample = train.drop(\"__INDEX\").sampleBy(\"tumor_score\", fractions={1: p, 2: p, 3: p}, seed=42)\n", "val_sample = val.drop(\"__INDEX\").sampleBy(\"tumor_score\", fractions={1: p, 2: p, 3: p}, seed=42)\n", @@ -830,62 +946,62 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": true + "collapsed": false, + "deletable": true, + "editable": true }, "outputs": [], "source": [ - "# TODO: turn this into a function\n", - "# Repartition to get ~128MB partitions\n", - "\n", - "# TODO: Update logic to use the following to automatically\n", - "# select the number of partitions:\n", - "# ex_mb = SIZE*SIZE*CHANNELS * 8 / 1024 / 1024 # size of one example in MB\n", - "# ideal_part_size_mb = 128 # 128 MB partitions sizes are empirically ideal\n", - "# ideal_exs_per_part = round(ideal_part_size_mb / ex_mb)\n", - "# tr_parts = round(tc / ideal_exs_per_part)\n", - "# val_parts = round(vc / ideal_exs_per_part)\n", - "\n", - "if grayscale:\n", - " train_sample = train_sample.repartition(150) #300) #3000)\n", - " val_sample = val_sample.repartition(40) #80) #800)\n", - "else: # 3x\n", - " train_sample = train_sample.repartition(450) #900) #9000)\n", - " val_sample = val_sample.repartition(120) #240) #2400)\n", - "\n", - "# Reassign row indices\n", + "# Reassign row indices.\n", + "# TODO: Wrap this in a function with appropriate default arguments.\n", "train_sample = (\n", " train_sample.rdd\n", " .zipWithIndex()\n", " .map(lambda r: (r[1] + 1, *r[0]))\n", " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", - "train_sample = train_sample.select(train_sample[\"__INDEX\"].astype(\"int\"), train_sample.slide_num.astype(\"int\"), \n", - " train_sample.tumor_score.astype(\"int\"), train_sample.molecular_score, train_sample[\"sample\"])\n", + "train_sample = train_sample.select(train_sample[\"__INDEX\"].astype(\"int\"),\n", + " train_sample.slide_num.astype(\"int\"), \n", + " train_sample.tumor_score.astype(\"int\"),\n", + " train_sample.molecular_score,\n", + " train_sample[\"sample\"])\n", "\n", "val_sample = (\n", " val_sample.rdd\n", " .zipWithIndex()\n", " .map(lambda r: (r[1] + 1, *r[0]))\n", " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", - "val_sample = val_sample.select(val_sample[\"__INDEX\"].astype(\"int\"), val_sample.slide_num.astype(\"int\"), \n", - " val_sample.tumor_score.astype(\"int\"), val_sample.molecular_score, val_sample[\"sample\"])\n", - "\n", - "train_sample, val_sample\n", + "val_sample = val_sample.select(val_sample[\"__INDEX\"].astype(\"int\"),\n", + " val_sample.slide_num.astype(\"int\"), \n", + " val_sample.tumor_score.astype(\"int\"),\n", + " val_sample.molecular_score,\n", + " val_sample[\"sample\"])\n", "\n", - "# Write\n", - "# TODO: Wrap this in a function with appropriate default arguments\n", - "mode = \"error\"\n", - "tr_sample_filename = os.path.join(\"data\", \"train_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\"))\n", - "val_sample_filename = os.path.join(\"data\", \"val_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\"))\n", - "train_sample.write.mode(mode).save(tr_sample_filename, format=\"parquet\")\n", - "val_sample.write.mode(mode).save(val_sample_filename, format=\"parquet\")" + "train_sample, val_sample" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "deletable": true, + "editable": true + }, + "outputs": [], + "source": [ + "# Save train and validation DataFrames.\n", + "tr_sample_filename = \"train_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\")\n", + "val_sample_filename = \"val_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\")\n", + "save(train_sample, tr_sample_filename, sample_size, grayscale)\n", + "save(val_sample, val_sample_filename, sample_size, grayscale)" ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 + Spark 2.x + SystemML", "language": "python", - "name": "python3" + "name": "pyspark3_2.x" }, "language_info": { "codemirror_mode": {
