[SYSTEMML-1185] Enable generation of sampled data in preprocessing This enables the generation of a 1% sampled DataFrame during the preprocessing phase of the breast cancer project.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5412e2d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5412e2d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5412e2d7 Branch: refs/heads/master Commit: 5412e2d7507c554a47f0112c4cc61570ae1b77d5 Parents: bbca632 Author: Mike Dusenberry <[email protected]> Authored: Wed Apr 5 18:14:40 2017 -0700 Committer: Mike Dusenberry <[email protected]> Committed: Wed Apr 5 18:14:40 2017 -0700 ---------------------------------------------------------------------- projects/breast_cancer/preprocess.py | 90 +++++++++++++++---------------- 1 file changed, 45 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5412e2d7/projects/breast_cancer/preprocess.py ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/preprocess.py b/projects/breast_cancer/preprocess.py index d789c97..95b9f36 100644 --- a/projects/breast_cancer/preprocess.py +++ b/projects/breast_cancer/preprocess.py @@ -100,49 +100,49 @@ save(val, val_df_path, sample_size, grayscale) # --- # -## Sample Data -### TODO: Wrap this in a function with appropriate default arguments -# -## Load train and validation DataFrames from disk. -#train = spark.read.load(train_df_path) -#val = spark.read.load(val_df_path) -# -## Take a stratified sample. -#p=0.01 -#train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42) -#val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42) -# -## Reassign row indices. -## TODO: Wrap this in a function with appropriate default arguments. -#train_sample = ( -# train_sample.rdd -# .zipWithIndex() -# .map(lambda r: (r[1] + 1, *r[0])) -# .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample'])) -#train_sample = train_sample.select(train_sample["__INDEX"].astype("int"), -# train_sample.slide_num.astype("int"), -# train_sample.tumor_score.astype("int"), -# train_sample.molecular_score, -# train_sample["sample"]) -# -#val_sample = ( -# val_sample.rdd -# .zipWithIndex() -# .map(lambda r: (r[1] + 1, *r[0])) -# .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample'])) -#val_sample = val_sample.select(val_sample["__INDEX"].astype("int"), -# val_sample.slide_num.astype("int"), -# val_sample.tumor_score.astype("int"), -# val_sample.molecular_score, -# val_sample["sample"]) -# -## Save train and validation DataFrames. -#tr_sample_filename = "train_{}_sample_{}{}.parquet".format(p, sample_size, -# "_grayscale" if grayscale else "") -#val_sample_filename = "val_{}_sample_{}{}.parquet".format(p, sample_size, -# "_grayscale" if grayscale else "") -#train_sample_path = os.path.join("save_folder", tr_sample_filename) -#val_sample_path = os.path.join("save_folder", val_sample_filename) -#save(train_sample, train_sample_path, sample_size, grayscale) -#save(val_sample, val_sample_path, sample_size, grayscale) +# Sample Data +## TODO: Wrap this in a function with appropriate default arguments + +# Load train and validation DataFrames from disk. +train = spark.read.load(train_df_path) +val = spark.read.load(val_df_path) + +# Take a stratified sample. +p=0.01 +train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42) +val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42) + +# Reassign row indices. +# TODO: Wrap this in a function with appropriate default arguments. +train_sample = ( + train_sample.rdd + .zipWithIndex() + .map(lambda r: (r[1] + 1, *r[0])) + .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample'])) +train_sample = train_sample.select(train_sample["__INDEX"].astype("int"), + train_sample.slide_num.astype("int"), + train_sample.tumor_score.astype("int"), + train_sample.molecular_score, + train_sample["sample"]) + +val_sample = ( + val_sample.rdd + .zipWithIndex() + .map(lambda r: (r[1] + 1, *r[0])) + .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample'])) +val_sample = val_sample.select(val_sample["__INDEX"].astype("int"), + val_sample.slide_num.astype("int"), + val_sample.tumor_score.astype("int"), + val_sample.molecular_score, + val_sample["sample"]) + +# Save train and validation DataFrames. +tr_sample_filename = "train_{}_sample_{}{}.parquet".format(p, sample_size, + "_grayscale" if grayscale else "") +val_sample_filename = "val_{}_sample_{}{}.parquet".format(p, sample_size, + "_grayscale" if grayscale else "") +train_sample_path = os.path.join(save_folder, tr_sample_filename) +val_sample_path = os.path.join(save_folder, val_sample_filename) +save(train_sample, train_sample_path, sample_size, grayscale) +save(val_sample, val_sample_path, sample_size, grayscale)
