Repository: incubator-systemml Updated Branches: refs/heads/master 19b72fad1 -> b18908ad7
[SYSTEMML-1185] Updating Preprocessing Notebook Updates to the Preprocessing notebook in prep for moving the preprocessing code to a library. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b18908ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b18908ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b18908ad Branch: refs/heads/master Commit: b18908ad7d57199ddb78aef30eff932e16dec22e Parents: 19b72fa Author: Mike Dusenberry <[email protected]> Authored: Mon Apr 3 14:14:07 2017 -0700 Committer: Mike Dusenberry <[email protected]> Committed: Mon Apr 3 14:14:07 2017 -0700 ---------------------------------------------------------------------- projects/breast_cancer/Preprocessing.ipynb | 342 +++++++++++------------- 1 file changed, 152 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b18908ad/projects/breast_cancer/Preprocessing.ipynb ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/Preprocessing.ipynb b/projects/breast_cancer/Preprocessing.ipynb index eb107cf..90d71b8 100644 --- a/projects/breast_cancer/Preprocessing.ipynb +++ b/projects/breast_cancer/Preprocessing.ipynb @@ -481,85 +481,6 @@ "editable": true }, "source": [ - "# Visualize Tile" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, - "outputs": [], - "source": [ - "def visualize_tile(tile):\n", - " \"\"\"\n", - " Plot a tissue tile.\n", - " \n", - " Args:\n", - " tile: A 3D NumPy array of shape (tile_size, tile_size, channels).\n", - " \n", - " Returns:\n", - " None\n", - " \"\"\"\n", - " plt.imshow(tile)\n", - " plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, - "source": [ - "# Visualize Sample" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, - "outputs": [], - "source": [ - "def visualize_sample(sample, size=256):\n", - " \"\"\"\n", - " Plot a tissue sample.\n", - " \n", - " Args:\n", - " sample: A square sample flattened to a vector of size\n", - " (channels*size_x*size_y).\n", - " size: The width and height of the square samples.\n", - " \n", - " Returns:\n", - " None\n", - " \"\"\"\n", - " # Change type, reshape, transpose to (size_x, size_y, channels).\n", - " length = sample.shape[0]\n", - " channels = int(length / (size * size))\n", - " if channels > 1:\n", - " sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)\n", - " plt.imshow(sample)\n", - " else:\n", - " vmax = 255 if sample.max() > 1 else 1\n", - " sample = sample.reshape((size, size))\n", - " plt.imshow(sample, cmap=\"gray\", vmin=0, vmax=vmax)\n", - " plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, - "source": [ "# Get Ground Truth Labels" ] }, @@ -585,38 +506,11 @@ " Returns:\n", " A Pandas DataFrame containing the ground truth labels for each slide.\n", " \"\"\"\n", - " filename = os.path.join(folder, \"training_ground_truth.csv\")\n", - " labels = pd.read_csv(filename, names=[\"tumor_score\",\"molecular_score\"], header=None)\n", - " labels[\"slide_num\"] = range(1, 501)\n", - " return labels" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, - "outputs": [], - "source": [ - "def create_ground_truth_maps(labels_df):\n", - " \"\"\"\n", - " Create lookup maps for ground truth labels.\n", - " \n", - " Args:\n", - " labels_df: A Pandas DataFrame containing the ground truth labels for\n", - " each slide.\n", - "\n", - " Returns:\n", - " A tuple of dictionaries mapping from the slide number to the\n", - " tumor score and to the molecular score.\n", - " \"\"\"\n", - " # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries.\n", - " tumor_score_dict = {int(s): int(l) for s,l in zip(labels_df.slide_num, labels_df.tumor_score)}\n", - " molecular_score_dict = {int(s): float(l) for s,l in zip(labels_df.slide_num, labels_df.molecular_score)}\n", - " return tumor_score_dict, molecular_score_dict" + " filepath = os.path.join(folder, \"training_ground_truth.csv\")\n", + " labels_df = pd.read_csv(filepath, names=[\"tumor_score\", \"molecular_score\"], header=None)\n", + " labels_df[\"slide_num\"] = labels_df.index + 1 # slide numbering starts at 1\n", + " labels_df.set_index(\"slide_num\", drop=False, inplace=True) # use the slide num as index\n", + " return labels_df" ] }, { @@ -674,34 +568,42 @@ " molecular score, and the sample stretched out into a Vector.\n", " \"\"\"\n", " slides = sc.parallelize(slide_nums)\n", - " # Force even partitioning by collecting and parallelizing -- for memory issues.\n", - " # TODO: Explore computing the ideal paritition sizes based on projected number\n", - " # of tiles after filtering.\n", - " ## HACK Note: This was a PySpark bug with a fix in the master branch now.\n", - " #tile_indices = slides.flatMap(\n", - " # lambda slide: process_slide(slide, folder, training, tile_size, overlap)).collect()\n", - " #tile_indices = sc.parallelize(tile_indices, num_partitions)\n", - " ## END HACK -- update later\n", + " # Create DataFrame of all tile locations and increase number of partitions\n", + " # to avoid OOM during subsequent processing.\n", " tile_indices = (slides.flatMap(\n", " lambda slide: process_slide(slide, folder, training, tile_size, overlap)))\n", + " # TODO: Explore computing the ideal paritition sizes based on projected number\n", + " # of tiles after filtering. I.e. something like the following:\n", + " #rows = tile_indices.count()\n", + " #part_size = 128\n", + " #channels = 1 if grayscale else 3\n", + " #row_mb = tile_size * tile_size * channels * 8 / 1024 / 1024 # size of one row in MB\n", + " #rows_per_part = round(part_size / row_mb)\n", + " #num_parts = rows / rows_per_part\n", + " ## HACK: Force even partitioning by collecting and parallelizing -- for memory issues.\n", + " ## Note: This was a PySpark bug with a fix in the master branch now.\n", + " #tile_indices = tile_indices.collect()\n", + " #tile_indices = sc.parallelize(tile_indices, num_partitions)\n", + " ## END HACK\n", " tile_indices = tile_indices.repartition(num_partitions)\n", " tile_indices.cache()\n", + " # Extract all tiles into a DataFrame, filter, and cut into smaller samples.\n", " tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))\n", " filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n", " samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n", " if training:\n", + " # Append labels\n", " labels_df = get_labels_df(folder)\n", - " tumor_score_dict, molecular_score_dict = create_ground_truth_maps(labels_df)\n", " samples_with_labels = (samples.map(\n", - " lambda tup: (tup[0], tumor_score_dict[tup[0]],\n", - " molecular_score_dict[tup[0]], Vectors.dense(tup[1]))))\n", + " lambda tup: (tup[0], int(labels_df.at[tup[0],\"tumor_score\"]),\n", + " float(labels_df.at[tup[0],\"molecular_score\"]), Vectors.dense(tup[1]))))\n", " df = samples_with_labels.toDF([\"slide_num\", \"tumor_score\", \"molecular_score\", \"sample\"])\n", " df = df.select(df.slide_num.astype(\"int\"), df.tumor_score.astype(\"int\"),\n", " df.molecular_score, df[\"sample\"])\n", " else: # testing data -- no labels\n", " df = samples.toDF([\"slide_num\", \"sample\"])\n", " df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n", - " #df = df.repartition(num_partitions) # Even out the partitions\n", + " #df = df.repartition(num_partitions) # HACK: Even out the partitions to avoid issues during saving\n", " return df" ] }, @@ -725,55 +627,35 @@ }, "outputs": [], "source": [ - "def train_val_split(df, slide_nums, folder, add_row_indices):\n", + "def train_val_split(df, slide_nums, folder, train_frac=0.8, add_row_indices=True, seed=None, debug=False):\n", " \"\"\"\n", - " Save a preprocessed DataFrame with a constraint on the file sizes.\n", + " Split a DataFrame of slide samples into training and validation sets.\n", " \n", " Args:\n", - " df: A DataFrame.\n", + " df: A DataFrame in which each row contains the slide number,\n", + " tumor score, molecular score, and the sample stretched out into\n", + " a Vector.\n", " slide_nums: A list of slide numbers to sample from.\n", " folder: Directory containing a `training_ground_truth.csv` file\n", " containing the ground truth \"tumor_score\" and \"molecular_score\"\n", " labels for each slide.\n", + " train_frac: Fraction of the data to assign to the training set, with\n", + " `1-frac` assigned to the valiation set.\n", " add_row_indices: Boolean for whether or not to prepend an index\n", " column contain the row index for use downstream by SystemML.\n", " The column name will be \"__INDEX\".\n", " \n", - " sample_size: The width and height of the square samples.\n", - " grayscale: Whether or not to the samples are in grayscale format, rather\n", - " than RGB.\n", - " folder: HDFS directory in which to save the DataFrame.\n", - " mode: Specifies the behavior of `df.write.mode` when the data already exists.\n", - " Options include:\n", - " * `append`: Append contents of this :class:`DataFrame` to existing data.\n", - " * `overwrite`: Overwrite existing data.\n", - " * `error`: Throw an exception if data already exists.\n", - " * `ignore`: Silently ignore this operation if data already exists.\n", - " format: The format in which to save the DataFrame.\n", - " file_size: Size in MB of each saved file. 128 MB is an empirically ideal size.\n", - " \n", " Returns:\n", " A DataFrame in which each row contains the slide number, tumor score,\n", " molecular score, and the sample stretched out into a Vector.\n", " \"\"\"\n", - " # Create DataFrame of labels.\n", + " # Create DataFrame of labels for the given slide numbers.\n", " labels_df = get_labels_df(folder)\n", - "\n", - " # Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n", - " # and merge with labels.\n", - " slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n", - " labeled_slide_nums_df = pd.merge(slide_nums_df, labels_df, how=\"inner\", on=\"slide_num\")\n", - "\n", - " # DEBUG: Examine class distribution.\n", - "# for pdf in [labels_df, labeled_slide_nums_df]:\n", - "# print(pdf.count())\n", - "# print(pdf[\"tumor_score\"].value_counts(sort=False))\n", - "# print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n", - "# print()\n", + " labels_df = labels_df.loc[slide_nums]\n", " \n", " # Randomly split slides 80%/20% into train and validation sets.\n", - " train_nums_df = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n", - " val_nums_df = labeled_slide_nums_df.drop(train_nums_df.index)\n", + " train_nums_df = labels_df.sample(frac=train_frac, random_state=seed)\n", + " val_nums_df = labels_df.drop(train_nums_df.index)\n", "\n", " train_nums = (spark.createDataFrame(train_nums_df)\n", " .selectExpr(\"cast(slide_num as int)\")\n", @@ -788,19 +670,20 @@ " train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n", " val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n", " \n", - " # DEBUG: Sanity checks.\n", - "# assert len(pd.merge(train_nums_df, val_nums_df, on=\"slide_num\")) == 0\n", - "# assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n", - "# assert train.join(val, on=\"slide_num\").count() == 0\n", - "# # - Check distributions.\n", - "# for pdf in train_nums_df, val_nums_df:\n", - "# print(pdf.count())\n", - "# print(pdf[\"tumor_score\"].value_counts(sort=False))\n", - "# print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n", - "# # - Check total number of examples in each.\n", - "# print(train.count(), val.count())\n", - "# # - Check physical plans for broadcast join.\n", - "# print(train.explain(), val.explain())\n", + " if debug:\n", + " # DEBUG: Sanity checks.\n", + " assert len(pd.merge(train_nums_df, val_nums_df, on=\"slide_num\")) == 0\n", + " assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n", + " assert train.join(val, on=\"slide_num\").count() == 0\n", + " # - Check distributions.\n", + " for pdf in train_nums_df, val_nums_df:\n", + " print(pdf.count())\n", + " print(pdf[\"tumor_score\"].value_counts(sort=False))\n", + " print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n", + " # - Check total number of examples in each.\n", + " print(train.count(), val.count())\n", + " # - Check physical plans for broadcast join.\n", + " print(train.explain(), val.explain())\n", " \n", " # Add row indices for use with SystemML.\n", " if add_row_indices:\n", @@ -828,7 +711,86 @@ "editable": true }, "source": [ - "# Save" + "# Visualize Tile" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true, + "deletable": true, + "editable": true + }, + "outputs": [], + "source": [ + "def visualize_tile(tile):\n", + " \"\"\"\n", + " Plot a tissue tile.\n", + " \n", + " Args:\n", + " tile: A 3D NumPy array of shape (tile_size, tile_size, channels).\n", + " \n", + " Returns:\n", + " None\n", + " \"\"\"\n", + " plt.imshow(tile)\n", + " plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "deletable": true, + "editable": true + }, + "source": [ + "# Visualize Sample" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true, + "deletable": true, + "editable": true + }, + "outputs": [], + "source": [ + "def visualize_sample(sample, size=256):\n", + " \"\"\"\n", + " Plot a tissue sample.\n", + " \n", + " Args:\n", + " sample: A square sample flattened to a vector of size\n", + " (channels*size_x*size_y).\n", + " size: The width and height of the square samples.\n", + " \n", + " Returns:\n", + " None\n", + " \"\"\"\n", + " # Change type, reshape, transpose to (size_x, size_y, channels).\n", + " length = sample.shape[0]\n", + " channels = int(length / (size * size))\n", + " if channels > 1:\n", + " sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)\n", + " plt.imshow(sample)\n", + " else:\n", + " vmax = 255 if sample.max() > 1 else 1\n", + " sample = sample.reshape((size, size))\n", + " plt.imshow(sample, cmap=\"gray\", vmin=0, vmax=vmax)\n", + " plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "deletable": true, + "editable": true + }, + "source": [ + "# Save DataFrame" ] }, { @@ -841,18 +803,16 @@ }, "outputs": [], "source": [ - "def save(df, filename, sample_size=256, grayscale=False, folder=\"data\",\n", - " mode=\"error\", format=\"parquet\", file_size=128):\n", + "def save(df, filepath, sample_size, grayscale, mode=\"error\", format=\"parquet\", file_size=128):\n", " \"\"\"\n", " Save a preprocessed DataFrame with a constraint on the file sizes.\n", " \n", " Args:\n", " df: A DataFrame.\n", - " filename: Name of the file to save.\n", + " filepath: Hadoop-supported path at which to save the DataFrame.\n", " sample_size: The width and height of the square samples.\n", " grayscale: Whether or not to the samples are in grayscale format, rather\n", " than RGB.\n", - " folder: HDFS directory in which to save the DataFrame.\n", " mode: Specifies the behavior of `df.write.mode` when the data already exists.\n", " Options include:\n", " * `append`: Append contents of this :class:`DataFrame` to existing data.\n", @@ -865,7 +825,6 @@ " channels = 1 if grayscale else 3\n", " row_mb = sample_size * sample_size * channels * 8 / 1024 / 1024 # size of one row in MB\n", " rows_per_file = round(file_size / row_mb)\n", - " filepath = os.path.join(folder, filename)\n", " df.write.option(\"maxRecordsPerFile\", rows_per_file).mode(mode).save(filepath, format=format)" ] }, @@ -925,12 +884,15 @@ "sample_size = 256\n", "grayscale = False\n", "num_partitions = 20000\n", - "add_row_indices = True\n", + "add_row_indices = False #True\n", + "train_frac = 0.8\n", "folder = \"/home/MDM/breast_cancer/data\"\n", - "filename = \"samples_{}_{}{}.parquet\".format(\n", - " \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\")\n", - "tr_filename = \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\")\n", - "val_filename = \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\")" + "# labels_df_path = os.path.join(folder, \"training_ground_truth.csv\")\n", + "save_folder = \"data\" # Hadoop-supported directory in which to save DataFrames\n", + "df_path = os.path.join(save_folder, \"samples_{}_{}{}.parquet\".format(\n", + " \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\"))\n", + "train_df_path = os.path.join(save_folder, \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n", + "val_df_path = os.path.join(save_folder, \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))" ] }, { @@ -945,8 +907,7 @@ "source": [ "# Process all slides.\n", "df = preprocess(slide_nums, tile_size=tile_size, sample_size=sample_size, grayscale=grayscale,\n", - " training=training, num_partitions=num_partitions, folder=folder)\n", - "df" + " training=training, num_partitions=num_partitions, folder=folder)" ] }, { @@ -960,7 +921,7 @@ "outputs": [], "source": [ "# Save DataFrame of samples.\n", - "save(df, filename, sample_size, grayscale)" + "save(df, df_path, sample_size, grayscale)" ] }, { @@ -974,8 +935,7 @@ "outputs": [], "source": [ "# Load full DataFrame from disk.\n", - "filepath = os.path.join(\"data\", filename)\n", - "df = spark.read.load(filepath)" + "df = spark.read.load(df_path)" ] }, { @@ -989,7 +949,7 @@ "outputs": [], "source": [ "# Split into train and validation DataFrames based On slide number\n", - "train, val = train_val_split(df, slide_nums, folder, add_row_indices)" + "train, val = train_val_split(df, slide_nums, folder, train_frac, add_row_indices)" ] }, { @@ -1003,8 +963,8 @@ "outputs": [], "source": [ "# Save train and validation DataFrames.\n", - "save(train, tr_filename, sample_size, grayscale)\n", - "save(val, val_filename, sample_size, grayscale)" + "save(train, train_df_path, sample_size, grayscale)\n", + "save(val, val_df_path, sample_size, grayscale)" ] }, { @@ -1049,8 +1009,8 @@ "outputs": [], "source": [ "# Load train and validation DataFrames from disk.\n", - "train = spark.read.load(os.path.join(\"data\", tr_filename))\n", - "val = spark.read.load(os.path.join(\"data\", val_filename))" + "train = spark.read.load(train_df_path)\n", + "val = spark.read.load(val_df_path)" ] }, { @@ -1120,8 +1080,10 @@ "# Save train and validation DataFrames.\n", "tr_sample_filename = \"train_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\")\n", "val_sample_filename = \"val_{}_sample_{}{}.parquet\".format(p, sample_size, \"_grayscale\" if grayscale else \"\")\n", - "save(train_sample, tr_sample_filename, sample_size, grayscale)\n", - "save(val_sample, val_sample_filename, sample_size, grayscale)" + "train_sample_path = os.path.join(\"save_folder\", tr_sample_filename)\n", + "val_sample_path = os.path.join(\"save_folder\", val_sample_filename)\n", + "save(train_sample, train_sample_path, sample_size, grayscale)\n", + "save(val_sample, val_sample_path, sample_size, grayscale)" ] } ],
