Repository: incubator-systemml Updated Branches: refs/heads/master a6d7aa549 -> 420dd17be
[SYSTEMML-1185] Updating Preprocessing Notebook Updating the breast cancer preprocessing notebook with a new function for splitting the full DataFrame into train and validation DataFrames. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/420dd17b Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/420dd17b Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/420dd17b Branch: refs/heads/master Commit: 420dd17bee3adf5569cc848c7f0d62e3923bd769 Parents: a6d7aa5 Author: Mike Dusenberry <[email protected]> Authored: Fri Mar 31 20:47:35 2017 -0700 Committer: Mike Dusenberry <[email protected]> Committed: Fri Mar 31 20:47:35 2017 -0700 ---------------------------------------------------------------------- projects/breast_cancer/Preprocessing.ipynb | 326 ++++++++++++++---------- 1 file changed, 187 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/420dd17b/projects/breast_cancer/Preprocessing.ipynb ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/Preprocessing.ipynb b/projects/breast_cancer/Preprocessing.ipynb index e5690a9..eb107cf 100644 --- a/projects/breast_cancer/Preprocessing.ipynb +++ b/projects/breast_cancer/Preprocessing.ipynb @@ -567,27 +567,55 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false, + "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ - "def create_ground_truth_maps(folder):\n", + "def get_labels_df(folder):\n", " \"\"\"\n", - " Create lookup maps for ground truth labels.\n", + " Create a DataFrame with the ground truth labels for each slide.\n", " \n", " Args:\n", - " folder: Directory in which the slides folder is stored, as a string.\n", - " This should contain a `training_ground_truth.csv` file.\n", + " folder: Directory containing a `training_ground_truth.csv` file\n", + " containing the ground truth \"tumor_score\" and \"molecular_score\"\n", + " labels for each slide.\n", + "\n", + " Returns:\n", + " A Pandas DataFrame containing the ground truth labels for each slide.\n", " \"\"\"\n", " filename = os.path.join(folder, \"training_ground_truth.csv\")\n", " labels = pd.read_csv(filename, names=[\"tumor_score\",\"molecular_score\"], header=None)\n", " labels[\"slide_num\"] = range(1, 501)\n", + " return labels" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "deletable": true, + "editable": true + }, + "outputs": [], + "source": [ + "def create_ground_truth_maps(labels_df):\n", + " \"\"\"\n", + " Create lookup maps for ground truth labels.\n", + " \n", + " Args:\n", + " labels_df: A Pandas DataFrame containing the ground truth labels for\n", + " each slide.\n", "\n", + " Returns:\n", + " A tuple of dictionaries mapping from the slide number to the\n", + " tumor score and to the molecular score.\n", + " \"\"\"\n", " # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries.\n", - " tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n", - " molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}\n", + " tumor_score_dict = {int(s): int(l) for s,l in zip(labels_df.slide_num, labels_df.tumor_score)}\n", + " molecular_score_dict = {int(s): float(l) for s,l in zip(labels_df.slide_num, labels_df.molecular_score)}\n", " return tumor_score_dict, molecular_score_dict" ] }, @@ -598,14 +626,14 @@ "editable": true }, "source": [ - "# Process All Slides Into A Saved Spark DataFrame" + "# Process All Slides Into A Spark DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false, + "collapsed": true, "deletable": true, "editable": true }, @@ -624,10 +652,13 @@ " \n", " Args:\n", " slide_nums: List of whole-slide numbers to process.\n", - " folder: Local directory in which the slides folder is stored, as a string.\n", - " This should contain either a `training_image_data` folder with\n", - " images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n", - " folder with images in the format `TUPAC-TE-###.svs`.\n", + " folder: Local directory in which the slides folder and ground truth\n", + " file is stored, as a string. This should contain a\n", + " `training_image_data` folder with images in the format\n", + " `TUPAC-TR-###.svs`, as well as a `training_ground_truth.csv` file\n", + " containing the ground truth \"tumor_score\" and \"molecular_score\"\n", + " labels for each slide. Alternatively, the folder should contain a\n", + " `testing_image_data` folder with images in the format `TUPAC-TE-###.svs`.\n", " training: Boolean for training or testing datasets.\n", " tile_size: The width and height of a square tile to be generated.\n", " overlap: Number of pixels by which to overlap the tiles.\n", @@ -659,7 +690,8 @@ " filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n", " samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n", " if training:\n", - " tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)\n", + " labels_df = get_labels_df(folder)\n", + " tumor_score_dict, molecular_score_dict = create_ground_truth_maps(labels_df)\n", " samples_with_labels = (samples.map(\n", " lambda tup: (tup[0], tumor_score_dict[tup[0]],\n", " molecular_score_dict[tup[0]], Vectors.dense(tup[1]))))\n", @@ -670,8 +702,145 @@ " df = samples.toDF([\"slide_num\", \"sample\"])\n", " df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n", " #df = df.repartition(num_partitions) # Even out the partitions\n", - " return df\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "deletable": true, + "editable": true + }, + "source": [ + "# Split Into Separate Train & Validation DataFrames Based On Slide Number" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true, + "deletable": true, + "editable": true + }, + "outputs": [], + "source": [ + "def train_val_split(df, slide_nums, folder, add_row_indices):\n", + " \"\"\"\n", + " Save a preprocessed DataFrame with a constraint on the file sizes.\n", + " \n", + " Args:\n", + " df: A DataFrame.\n", + " slide_nums: A list of slide numbers to sample from.\n", + " folder: Directory containing a `training_ground_truth.csv` file\n", + " containing the ground truth \"tumor_score\" and \"molecular_score\"\n", + " labels for each slide.\n", + " add_row_indices: Boolean for whether or not to prepend an index\n", + " column contain the row index for use downstream by SystemML.\n", + " The column name will be \"__INDEX\".\n", + " \n", + " sample_size: The width and height of the square samples.\n", + " grayscale: Whether or not to the samples are in grayscale format, rather\n", + " than RGB.\n", + " folder: HDFS directory in which to save the DataFrame.\n", + " mode: Specifies the behavior of `df.write.mode` when the data already exists.\n", + " Options include:\n", + " * `append`: Append contents of this :class:`DataFrame` to existing data.\n", + " * `overwrite`: Overwrite existing data.\n", + " * `error`: Throw an exception if data already exists.\n", + " * `ignore`: Silently ignore this operation if data already exists.\n", + " format: The format in which to save the DataFrame.\n", + " file_size: Size in MB of each saved file. 128 MB is an empirically ideal size.\n", + " \n", + " Returns:\n", + " A DataFrame in which each row contains the slide number, tumor score,\n", + " molecular score, and the sample stretched out into a Vector.\n", + " \"\"\"\n", + " # Create DataFrame of labels.\n", + " labels_df = get_labels_df(folder)\n", + "\n", + " # Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n", + " # and merge with labels.\n", + " slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n", + " labeled_slide_nums_df = pd.merge(slide_nums_df, labels_df, how=\"inner\", on=\"slide_num\")\n", + "\n", + " # DEBUG: Examine class distribution.\n", + "# for pdf in [labels_df, labeled_slide_nums_df]:\n", + "# print(pdf.count())\n", + "# print(pdf[\"tumor_score\"].value_counts(sort=False))\n", + "# print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n", + "# print()\n", + " \n", + " # Randomly split slides 80%/20% into train and validation sets.\n", + " train_nums_df = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n", + " val_nums_df = labeled_slide_nums_df.drop(train_nums_df.index)\n", + "\n", + " train_nums = (spark.createDataFrame(train_nums_df)\n", + " .selectExpr(\"cast(slide_num as int)\")\n", + " .coalesce(1))\n", + " val_nums = (spark.createDataFrame(val_nums_df)\n", + " .selectExpr(\"cast(slide_num as int)\")\n", + " .coalesce(1))\n", "\n", + " # Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n", + " # in order to have Catalyst choose the more efficient BroadcastHashJoin, \n", + " # rather than the costly SortMergeJoin.\n", + " train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n", + " val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n", + " \n", + " # DEBUG: Sanity checks.\n", + "# assert len(pd.merge(train_nums_df, val_nums_df, on=\"slide_num\")) == 0\n", + "# assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n", + "# assert train.join(val, on=\"slide_num\").count() == 0\n", + "# # - Check distributions.\n", + "# for pdf in train_nums_df, val_nums_df:\n", + "# print(pdf.count())\n", + "# print(pdf[\"tumor_score\"].value_counts(sort=False))\n", + "# print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n", + "# # - Check total number of examples in each.\n", + "# print(train.count(), val.count())\n", + "# # - Check physical plans for broadcast join.\n", + "# print(train.explain(), val.explain())\n", + " \n", + " # Add row indices for use with SystemML.\n", + " if add_row_indices:\n", + " train = (train.rdd\n", + " .zipWithIndex()\n", + " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n", + " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", + " train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n", + " train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n", + "\n", + " val = (val.rdd\n", + " .zipWithIndex()\n", + " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n", + " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", + " val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n", + " val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n", + "\n", + " return train, val" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "deletable": true, + "editable": true + }, + "source": [ + "# Save" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "deletable": true, + "editable": true + }, + "outputs": [], + "source": [ "def save(df, filename, sample_size=256, grayscale=False, folder=\"data\",\n", " mode=\"error\", format=\"parquet\", file_size=128):\n", " \"\"\"\n", @@ -756,6 +925,7 @@ "sample_size = 256\n", "grayscale = False\n", "num_partitions = 20000\n", + "add_row_indices = True\n", "folder = \"/home/MDM/breast_cancer/data\"\n", "filename = \"samples_{}_{}{}.parquet\".format(\n", " \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\")\n", @@ -794,26 +964,6 @@ ] }, { - "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, - "source": [ - "# Split Into Separate Train & Validation DataFrames Based On Slide Number" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, - "source": [ - "### TODO: Wrap this in a function with appropriate default arguments" - ] - }, - { "cell_type": "code", "execution_count": null, "metadata": { @@ -834,114 +984,12 @@ "metadata": { "collapsed": false, "deletable": true, - "editable": true, - "scrolled": false - }, - "outputs": [], - "source": [ - "# Create DataFrame of labels.\n", - "labels = pd.read_csv(\n", - " \"data/training_ground_truth.csv\", names=[\"tumor_score\",\"molecular_score\"], header=None)\n", - "labels[\"slide_num\"] = range(1, 501) # add slide num column\n", - "\n", - "# Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n", - "# and merge with labels.\n", - "slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n", - "labeled_slide_nums_df = pd.merge(slide_nums_df, labels, how=\"inner\", on=\"slide_num\")\n", - "\n", - "# Examine class distribution.\n", - "for pdf in [labels, labeled_slide_nums_df]:\n", - " print(pdf.count())\n", - " print(pdf[\"tumor_score\"].value_counts(sort=False))\n", - " print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n", - " print()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, - "outputs": [], - "source": [ - "# Randomly split slides 80%/20% into train and validation sets.\n", - "train_nums_pdf = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n", - "val_nums_pdf = labeled_slide_nums_df.drop(train_nums_pdf.index)\n", - "\n", - "train_nums = (spark.createDataFrame(train_nums_pdf)\n", - " .selectExpr(\"cast(slide_num as int)\")\n", - " .coalesce(1))\n", - "val_nums = (spark.createDataFrame(val_nums_pdf)\n", - " .selectExpr(\"cast(slide_num as int)\")\n", - " .coalesce(1))\n", - "\n", - "# Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n", - "# in order to have Catalyst choose the more efficient BroadcastHashJoin, \n", - "# rather than the costly SortMergeJoin.\n", - "train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n", - "val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n", - "\n", - "train, val" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, - "outputs": [], - "source": [ - "# Sanity checks.\n", - "assert len(pd.merge(train_nums_pdf, val_nums_pdf, on=\"slide_num\")) == 0\n", - "assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n", - "assert train.join(val, on=\"slide_num\").count() == 0\n", - "\n", - "# Check distributions.\n", - "for pdf in train_nums_pdf, val_nums_pdf:\n", - " print(pdf.count())\n", - " print(pdf[\"tumor_score\"].value_counts(sort=False))\n", - " print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n", - "\n", - "# Check total number of examples in each.\n", - "print(train.count(), val.count())\n", - "\n", - "# Check physical plans for broadcast join.\n", - "print(train.explain(), val.explain())" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "deletable": true, "editable": true }, "outputs": [], "source": [ - "# Add row indices for use with SystemML.\n", - "# TODO: Wrap this in a function with appropriate default arguments.\n", - "train = (train.rdd\n", - " .zipWithIndex()\n", - " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n", - " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", - "train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n", - " train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n", - "\n", - "val = (val.rdd\n", - " .zipWithIndex()\n", - " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n", - " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n", - "val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n", - " val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n", - "\n", - "train, val" + "# Split into train and validation DataFrames based On slide number\n", + "train, val = train_val_split(df, slide_nums, folder, add_row_indices)" ] }, {
