Repository: incubator-systemml
Updated Branches:
  refs/heads/master 9c31c2ca7 -> b6a46500d


[SYSTEMML-1185] Updating Preprocessing Notebook

Updating the Preprocessing notebook with randomized train/val split
on the original slide numbers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b6a46500
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b6a46500
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b6a46500

Branch: refs/heads/master
Commit: b6a46500df3eb764b79f887d085c8c6578f8c3fd
Parents: 9c31c2c
Author: Mike Dusenberry <[email protected]>
Authored: Fri Feb 24 17:42:56 2017 -0800
Committer: Mike Dusenberry <[email protected]>
Committed: Fri Feb 24 17:42:56 2017 -0800

----------------------------------------------------------------------
 projects/breast_cancer/Preprocessing.ipynb | 82 +++++++++++++++++++------
 1 file changed, 63 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b6a46500/projects/breast_cancer/Preprocessing.ipynb
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/Preprocessing.ipynb 
b/projects/breast_cancer/Preprocessing.ipynb
index 3db7560..2c2cc41 100644
--- a/projects/breast_cancer/Preprocessing.ipynb
+++ b/projects/breast_cancer/Preprocessing.ipynb
@@ -44,6 +44,7 @@
     "from openslide.deepzoom import DeepZoomGenerator\n",
     "import pandas as pd\n",
     "from pyspark.ml.linalg import Vectors\n",
+    "import pyspark.sql.functions as F\n",
     "from scipy.ndimage.morphology import binary_fill_holes\n",
     "from skimage.color import rgb2gray\n",
     "from skimage.feature import canny\n",
@@ -781,7 +782,7 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false,
+    "collapsed": true,
     "deletable": true,
     "editable": true
    },
@@ -798,24 +799,26 @@
    "metadata": {
     "collapsed": false,
     "deletable": true,
-    "editable": true,
-    "scrolled": true
+    "editable": true
    },
    "outputs": [],
    "source": [
-    "# Determine how to split data.\n",
+    "# Create DataFrame of labels.\n",
     "labels = pd.read_csv(\n",
     "    \"data/training_ground_truth.csv\", 
names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
-    "labels[\"slide_num\"] = range(1, 501)\n",
+    "labels[\"slide_num\"] = range(1, 501)  # add slide num column\n",
     "\n",
-    "# # Create slide_num -> tumor_score and slide_num -> molecular_score 
dictionaries\n",
-    "# tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, 
labels.tumor_score)}\n",
-    "# molecular_score_dict = {int(s): float(l) for s,l in 
zip(labels.slide_num, labels.molecular_score)}\n",
+    "# Create DataFrames of the slide numbers being used (i.e. without the 
broken ones)\n",
+    "# and merge with labels.\n",
+    "slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n",
+    "labeled_slide_nums_df = pd.merge(slide_nums_df, labels, how=\"inner\", 
on=\"slide_num\")\n",
     "\n",
-    "print(labels[\"tumor_score\"].value_counts(sort=False))\n",
-    "print(labels[\"tumor_score\"].value_counts(normalize=True, 
sort=False))\n",
-    "print(labels[labels.slide_num <= 
400][\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
-    "print(labels[labels.slide_num > 
400][\"tumor_score\"].value_counts(normalize=True, sort=False))"
+    "# Examine class distribution.\n",
+    "for pdf in [labels, labeled_slide_nums_df]:\n",
+    "  print(pdf.count())\n",
+    "  print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
+    "  print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
+    "  print()"
    ]
   },
   {
@@ -828,10 +831,23 @@
    },
    "outputs": [],
    "source": [
-    "# Split data into train and validation sets.\n",
-    "# TODO: Stratified random split.\n",
-    "train = df.where(df.slide_num <= 400)\n",
-    "val = df.where(df.slide_num > 400)\n",
+    "# Randomly split slides 80%/20% into train and validation sets.\n",
+    "train_nums_pdf = labeled_slide_nums_df.sample(frac=0.8, 
random_state=24)\n",
+    "val_nums_pdf = labeled_slide_nums_df.drop(train_nums_pdf.index)\n",
+    "\n",
+    "train_nums = (spark.createDataFrame(train_nums_pdf)\n",
+    "                   .selectExpr(\"cast(slide_num as int)\")\n",
+    "                   .coalesce(1))\n",
+    "val_nums = (spark.createDataFrame(val_nums_pdf)\n",
+    "                 .selectExpr(\"cast(slide_num as int)\")\n",
+    "                 .coalesce(1))\n",
+    "\n",
+    "# Note: Explicitly mark the smaller DataFrames as able to be 
broadcasted\n",
+    "# in order to have Catalyst choose the more efficient BroadcastHashJoin, 
\n",
+    "# rather than the costly SortMergeJoin.\n",
+    "train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n",
+    "val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n",
+    "\n",
     "train, val"
    ]
   },
@@ -845,6 +861,34 @@
    },
    "outputs": [],
    "source": [
+    "# Sanity checks.\n",
+    "assert len(pd.merge(train_nums_pdf, val_nums_pdf, on=\"slide_num\")) == 
0\n",
+    "assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n",
+    "assert train.join(val, on=\"slide_num\").count() == 0\n",
+    "\n",
+    "# Check distributions.\n",
+    "for pdf in train_nums_pdf, val_nums_pdf:\n",
+    "  print(pdf.count())\n",
+    "  print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
+    "  print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), 
\"\\n\")\n",
+    "\n",
+    "# Check total number of examples in each.\n",
+    "print(train.count(), val.count())\n",
+    "\n",
+    "# Check physical plans for broadcast join.\n",
+    "print(train.explain(), val.explain())"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
+   },
+   "outputs": [],
+   "source": [
     "# Add row indices for use with SystemML.\n",
     "# TODO: Wrap this in a function with appropriate default arguments.\n",
     "train = (train.rdd\n",
@@ -921,8 +965,8 @@
    "outputs": [],
    "source": [
     "# Load train and validation DataFrames from disk.\n",
-    "train = spark.read.load(tr_filename)\n",
-    "val = spark.read.load(val_filename)"
+    "train = spark.read.load(os.path.join(\"data\", tr_filename))\n",
+    "val = spark.read.load(os.path.join(\"data\", val_filename))"
    ]
   },
   {
@@ -983,7 +1027,7 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false,
+    "collapsed": true,
     "deletable": true,
     "editable": true
    },

Reply via email to