Repository: incubator-systemml Updated Branches: refs/heads/master 3d1f77ce2 -> e93fdd35b
[SYSTEMML-1465] Add stain normalization to breast cancer preprocessing. Adding stain normalization of H&E histology slides for the breast cancer project. Closes #507. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e93fdd35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e93fdd35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e93fdd35 Branch: refs/heads/master Commit: e93fdd35b82e95fd41023850939593d5d37c8512 Parents: 3d1f77c Author: Mike Dusenberry <[email protected]> Authored: Mon May 22 16:39:09 2017 -0700 Committer: Mike Dusenberry <[email protected]> Committed: Mon May 22 16:39:09 2017 -0700 ---------------------------------------------------------------------- .../breast_cancer/breastcancer/preprocessing.py | 261 ++++++++++++++----- 1 file changed, 200 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e93fdd35/projects/breast_cancer/breastcancer/preprocessing.py ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/breastcancer/preprocessing.py b/projects/breast_cancer/breastcancer/preprocessing.py index 424d699..cfedf94 100644 --- a/projects/breast_cancer/breastcancer/preprocessing.py +++ b/projects/breast_cancer/breastcancer/preprocessing.py @@ -47,7 +47,7 @@ from skimage.morphology import binary_closing, binary_dilation, disk def open_slide(slide_num, folder, training): """ Open a whole-slide image, given an image number. - + Args: slide_num: Slide image number as an integer. folder: Directory in which the slides folder is stored, as a string. @@ -55,7 +55,7 @@ def open_slide(slide_num, folder, training): images in the format `TUPAC-TR-###.svs`, or a `testing_image_data` folder with images in the format `TUPAC-TE-###.svs`. training: Boolean for training or testing datasets. - + Returns: An OpenSlide object representing a whole-slide image. """ @@ -75,15 +75,15 @@ def open_slide(slide_num, folder, training): def create_tile_generator(slide, tile_size, overlap): """ Create a tile generator for the given slide. - + This generator is able to extract tiles from the overall whole-slide image. - + Args: slide: An OpenSlide object representing a whole-slide image. tile_size: The width and height of a square tile to be generated. overlap: Number of pixels by which to overlap the tiles. - + Returns: A DeepZoomGenerator object representing the tile generator. Each extracted tile is a PIL Image with shape @@ -100,18 +100,18 @@ def create_tile_generator(slide, tile_size, overlap): def get_20x_zoom_level(slide, generator): """ Return the zoom level that corresponds to a 20x magnification. - + The generator can extract tiles from multiple zoom levels, downsampling by a factor of 2 per level from highest to lowest resolution. - + Args: slide: An OpenSlide object representing a whole-slide image. generator: A DeepZoomGenerator object representing a tile generator. Note: This generator is not a true "Python generator function", but rather is an object that is capable of extracting individual tiles. - + Returns: Zoom level corresponding to a 20x magnification, or as close as possible. @@ -138,11 +138,11 @@ def get_20x_zoom_level(slide, generator): def process_slide(slide_num, folder, training, tile_size, overlap): """ Generate all possible tile indices for a whole-slide image. - + Given a slide number, tile size, and overlap, generate all possible (slide_num, tile_size, overlap, zoom_level, col, row) indices. - + Args: slide_num: Slide image number as an integer. folder: Directory in which the slides folder is stored, as a string. @@ -152,7 +152,7 @@ def process_slide(slide_num, folder, training, tile_size, overlap): training: Boolean for training or testing datasets. tile_size: The width and height of a square tile to be generated. overlap: Number of pixels by which to overlap the tiles. - + Returns: A list of (slide_num, tile_size, overlap, zoom_level, col, row) integer index tuples representing possible tiles to extract. @@ -175,10 +175,10 @@ def process_slide(slide_num, folder, training, tile_size, overlap): def process_tile_index(tile_index, folder, training): """ Generate a tile from a tile index. - + Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile index, generate a (slide_num, tile) tuple. - + Args: tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row) integer index tuple representing a tile to extract. @@ -187,7 +187,7 @@ def process_tile_index(tile_index, folder, training): images in the format `TUPAC-TR-###.svs`, or a `testing_image_data` folder with images in the format `TUPAC-TE-###.svs`. training: Boolean for training or testing datasets. - + Returns: A (slide_num, tile) tuple, where slide_num is an integer, and tile is a 3D NumPy array of shape (tile_size, tile_size, channels) in @@ -199,7 +199,7 @@ def process_tile_index(tile_index, folder, training): # Create tile generator. generator = create_tile_generator(slide, tile_size, overlap) # Generate tile. - tile = np.array(generator.get_tile(zoom_level, (col, row))) + tile = np.asarray(generator.get_tile(zoom_level, (col, row))) return (slide_num, tile) @@ -208,10 +208,10 @@ def process_tile_index(tile_index, folder, training): def optical_density(tile): """ Convert a tile to optical density values. - + Args: tile: A 3D NumPy array of shape (tile_size, tile_size, channels). - + Returns: A 3D NumPy array of shape (tile_size, tile_size, channels) representing optical density values. @@ -225,20 +225,20 @@ def optical_density(tile): def keep_tile(tile_tuple, tile_size, tissue_threshold): """ Determine if a tile should be kept. - + This filters out tiles based on size and a tissue percentage threshold, using a custom algorithm. If a tile has height & width equal to (tile_size, tile_size), and contains greater than or equal to the given percentage, then it will be kept; otherwise it will be filtered out. - + Args: tile_tuple: A (slide_num, tile) tuple, where slide_num is an - integer, and tile is a 3D NumPy array of shape + integer, and tile is a 3D NumPy array of shape (tile_size, tile_size, channels) in RGB format. tile_size: The width and height of a square tile to be generated. tissue_threshold: Tissue percentage threshold. - + Returns: A Boolean indicating whether or not a tile should be kept for future usage. @@ -246,7 +246,7 @@ def keep_tile(tile_tuple, tile_size, tissue_threshold): slide_num, tile = tile_tuple if tile.shape[0:2] == (tile_size, tile_size): tile_orig = tile - + # Check 1 # Convert 3D RGB image to 2D grayscale image, from # 0 (dense tissue) to 1 (plain background). @@ -272,7 +272,7 @@ def keep_tile(tile_tuple, tile_size, tissue_threshold): # Calculate percentage of tissue coverage. percentage = tile.mean() check1 = percentage >= tissue_threshold - + # Check 2 # Convert to optical density values tile = optical_density(tile_orig) @@ -285,39 +285,36 @@ def keep_tile(tile_tuple, tile_size, tissue_threshold): tile = binary_fill_holes(tile) percentage = tile.mean() check2 = percentage >= tissue_threshold - + return check1 and check2 else: return False -# Generate Flattened Samples From Tile +# Generate Samples From Tile def process_tile(tile_tuple, sample_size, grayscale): """ Process a tile into a group of smaller samples. - + Cut up a tile into smaller blocks of sample_size x sample_size pixels, - change the shape of each sample from (H, W, channels) to + change the shape of each sample from (H, W, channels) to (channels, H, W), then flatten each into a vector of length channels*H*W. - + Args: tile_tuple: A (slide_num, tile) tuple, where slide_num is an - integer, and tile is a 3D NumPy array of shape + integer, and tile is a 3D NumPy array of shape (tile_size, tile_size, channels). sample_size: The new width and height of the square samples to be generated. grayscale: Whether or not to generate grayscale samples, rather than RGB. - + Returns: A list of (slide_num, sample) tuples representing cut up tiles, - where each sample has been transposed from - (sample_size_x, sample_size_y, channels) to - (channels, sample_size_x, sample_size_y), - and flattened to a vector of length - (channels*sample_size_x*sample_size_y). + where each sample is a 3D NumPy array of shape + (sample_size_x, sample_size_y, channels). """ slide_num, tile = tile_tuple if grayscale: @@ -332,24 +329,166 @@ def process_tile(tile_tuple, sample_size, grayscale): # (num_x, num_y, sample_size_x, sample_size_y, ch). # 3. Combine num_x and num_y into single axis, returning # (num_samples, sample_size_x, sample_size_y, ch). - # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to - # (num_samples, ch, sample_size_x, sample_size_y). - # 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y). samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch)) .swapaxes(1,2) - .reshape((-1, sample_size, sample_size, ch)) - .transpose(0,3,1,2)) - samples = samples.reshape(samples.shape[0], -1) + .reshape((-1, sample_size, sample_size, ch))) samples = [(slide_num, sample) for sample in list(samples)] return samples +# Normalize staining + +def normalize_staining(sample_tuple, beta=0.15, alpha=1, light_intensity=255): + """ + Normalize the staining of H&E histology slides. + + This function normalizes the staining of H&E histology slides. + + References: + - Macenko, Marc, et al. "A method for normalizing histology slides + for quantitative analysis." Biomedical Imaging: From Nano to Macro, + 2009. ISBI'09. IEEE International Symposium on. IEEE, 2009. + - http://wwwx.cs.unc.edu/~mn/sites/default/files/macenko2009.pdf + - https://github.com/mitkovetta/staining-normalization + + Args: + sample_tuple: A (slide_num, sample) tuple, where slide_num is an + integer, and sample is a 3D NumPy array of shape (H,W,C). + + Returns: + A (slide_num, sample) tuple, where the sample is a 3D NumPy array + of shape (H,W,C) that has been stain normalized. + """ + # Setup. + slide_num, sample = sample_tuple + x = np.asarray(sample) + h, w, c = x.shape + x = x.reshape(-1, c).astype(np.float64) # shape (H*W, C) + + # Reference stain vectors and stain saturations. We will normalize all slides + # to these references. To create these, grab the stain vectors and stain + # saturations from a desirable slide. + + # Values in reference implementation for use with eigendecomposition approach, natural log, + # and `light_intensity=240`. + #stain_ref = np.array([0.5626, 0.2159, 0.7201, 0.8012, 0.4062, 0.5581]).reshape(3,2) + #max_sat_ref = np.array([1.9705, 1.0308]).reshape(2,1) + + # SVD w/ log10, and `light_intensity=255`. + stain_ref = (np.array([0.54598845, 0.322116, 0.72385198, 0.76419107, 0.42182333, 0.55879629]) + .reshape(3,2)) + max_sat_ref = np.array([0.82791151, 0.61137274]).reshape(2,1) + + # Convert RGB to OD. + # Note: The original paper used log10, and the reference implementation used the natural log. + #OD = -np.log((x+1)/light_intensity) # shape (H*W, C) + OD = -np.log10(x/light_intensity + 1e-8) + + # Remove data with OD intensity less than beta. + # I.e. remove transparent pixels. + # Note: This needs to be checked per channel, rather than + # taking an average over all channels for a given pixel. + OD_thresh = OD[np.all(OD >= beta, 1), :] # shape (K, C) + + # Calculate eigenvectors. + # Note: We can either use eigenvector decomposition, or SVD. + #eigvals, eigvecs = np.linalg.eig(np.cov(OD_thresh.T)) # np.cov results in inf/nans + U, s, V = np.linalg.svd(OD_thresh, full_matrices=False) + + # Extract two largest eigenvectors. + # Note: We swap the sign of the eigvecs here to be consistent + # with other implementations. Both +/- eigvecs are valid, with + # the same eigenvalue, so this is okay. + #top_eigvecs = eigvecs[:, np.argsort(eigvals)[-2:]] * -1 + top_eigvecs = V[0:2, :].T * -1 # shape (C, 2) + + # Project thresholded optical density values onto plane spanned by + # 2 largest eigenvectors. + proj = np.dot(OD_thresh, top_eigvecs) # shape (K, 2) + + # Calculate angle of each point wrt the first plane direction. + # Note: the parameters are `np.arctan2(y, x)` + angles = np.arctan2(proj[:, 1], proj[:, 0]) # shape (K,) + + # Find robust extremes (a and 100-a percentiles) of the angle. + min_angle = np.percentile(angles, alpha) + max_angle = np.percentile(angles, 100-alpha) + + # Convert min/max vectors (extremes) back to optimal stains in OD space. + # This computes a set of axes for each angle onto which we can project + # the top eigenvectors. This assumes that the projected values have + # been normalized to unit length. + extreme_angles = np.array( + [[np.cos(min_angle), np.cos(max_angle)], + [np.sin(min_angle), np.sin(max_angle)]] + ) # shape (2,2) + stains = np.dot(top_eigvecs, extreme_angles) # shape (C, 2) + + # Merge vectors with hematoxylin first, and eosin second, as a heuristic. + if stains[0, 0] < stains[0, 1]: + stains[:, [0, 1]] = stains[:, [1, 0]] # swap columns + + # Calculate saturations of each stain. + # Note: Here, we solve + # OD = VS + # S = V^{-1}OD + # where `OD` is the matrix of optical density values of our image, + # `V` is the matrix of stain vectors, and `S` is the matrix of stain + # saturations. Since this is an overdetermined system, we use the + # least squares solver, rather than a direct solve. + sats, _, _, _ = np.linalg.lstsq(stains, OD.T) + + # Normalize stain saturations to have same pseudo-maximum based on + # a reference max saturation. + max_sat = np.percentile(sats, 99, axis=1, keepdims=True) + sats = sats / max_sat * max_sat_ref + + # Compute optimal OD values. + OD_norm = np.dot(stain_ref, sats) + + # Recreate image. + # Note: If the image is immediately converted to uint8 with `.astype(np.uint8)`, it will + # not return the correct values due to the initital values being outside of [0,255]. + # To fix this, we round to the nearest integer, and then clip to [0,255], which is the + # same behavior as Matlab. + #x_norm = np.exp(OD_norm) * light_intensity # natural log approach + x_norm = 10**(-OD_norm) * light_intensity - 1e-8 # log10 approach + x_norm = np.clip(np.round(x_norm), 0, 255).astype(np.uint8) + x_norm = x_norm.astype(np.uint8) + x_norm = x_norm.T.reshape(h,w,c) + return (slide_num, x_norm) + + +def flatten_sample(sample_tuple): + """ + Flatten a (H,W,C) sample into a (C*H*W) row vector. + + Transpose each sample from (H, W, channels) to (channels, H, W), then + flatten each into a vector of length channels*H*W. + + Args: + sample_tuple: A (slide_num, sample) tuple, where slide_num is an + integer, and sample is a 3D NumPy array of shape (H,W,C). + + Returns: + A (slide_num, sample) tuple, where the sample has been transposed + from (H,W,C) to (C,H,W), and flattened to a vector of length + (C*H*W). + """ + slide_num, sample = sample_tuple + # 1. Swap axes from (sample_size_x, sample_size_y, ch) to + # (ch, sample_size_x, sample_size_y). + # 2. Flatten sample into (ch*sample_size_x*sample_size_y). + flattened_sample = sample.transpose(2,0,1).reshape(-1) + return (slide_num, flattened_sample) + + # Get Ground Truth Labels def get_labels_df(folder): """ Create a DataFrame with the ground truth labels for each slide. - + Args: folder: Directory containing a `training_ground_truth.csv` file containing the ground truth "tumor_score" and "molecular_score" @@ -369,17 +508,18 @@ def get_labels_df(folder): # Process All Slides Into A Spark DataFrame def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, overlap=0, - tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000): + tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True, + num_partitions=20000): """ Preprocess a set of whole-slide images. - + Preprocess a set of whole-slide images as follows: 1. Tile the slides into tiles of size (tile_size, tile_size, 3). 2. Filter the tiles to remove unnecessary tissue. 3. Cut the remaining tiles into samples of size (sample_size, sample_size, ch), where `ch` is 1 if `grayscale` is true, or 3 otherwise. - + Args: spark: SparkSession. slide_nums: List of whole-slide numbers to process. @@ -399,8 +539,9 @@ def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, generated. grayscale: Whether or not to generate grayscale samples, rather than RGB. + normalize_stains: Whether or not to apply stain normalization. num_partitions: Number of partitions to use during processing. - + Returns: A Spark DataFrame in which each row contains the slide number, tumor score, molecular score, and the sample stretched out into a Vector. @@ -418,17 +559,16 @@ def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, #row_mb = tile_size * tile_size * channels * 8 / 1024 / 1024 # size of one row in MB #rows_per_part = round(part_size / row_mb) #num_parts = rows / rows_per_part - ## HACK: Force even partitioning by collecting and parallelizing -- for memory issues. - ## Note: This was a PySpark bug with a fix in the master branch now. - #tile_indices = tile_indices.collect() - #tile_indices = sc.parallelize(tile_indices, num_partitions) - ## END HACK tile_indices = tile_indices.repartition(num_partitions) tile_indices.cache() - # Extract all tiles into a DataFrame, filter, and cut into smaller samples. + # Extract all tiles into a DataFrame, filter, cut into smaller samples, apply stain + # normalization, and flatten. tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training)) filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold)) samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale)) + if normalize_stains: + samples = samples.map(lambda sample: normalize_staining(sample)) + samples = samples.map(lambda sample: flatten_sample(sample)) if training: # Append labels labels_df = get_labels_df(folder) @@ -441,7 +581,6 @@ def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, else: # testing data -- no labels df = samples.toDF(["slide_num", "sample"]) df = df.select(df.slide_num.astype("int"), df["sample"]) - #df = df.repartition(num_partitions) # HACK: Even out the partitions to avoid saving issues return df @@ -451,7 +590,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic debug=False): """ Split a DataFrame of slide samples into training and validation sets. - + Args: spark: SparkSession. df: A Spark DataFrame in which each row contains the slide number, @@ -466,7 +605,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic add_row_indices: Boolean for whether or not to prepend an index column contain the row index for use downstream by SystemML. The column name will be "__INDEX". - + Returns: A Spark DataFrame in which each row contains the slide number, tumor score, molecular score, and the sample stretched out into a Vector. @@ -474,7 +613,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic # Create DataFrame of labels for the given slide numbers. labels_df = get_labels_df(folder) labels_df = labels_df.loc[slide_nums] - + # Randomly split slides 80%/20% into train and validation sets. train_nums_df = labels_df.sample(frac=train_frac, random_state=seed) val_nums_df = labels_df.drop(train_nums_df.index) @@ -487,11 +626,11 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic .coalesce(1)) # Note: Explicitly mark the smaller DataFrames as able to be broadcasted - # in order to have Catalyst choose the more efficient BroadcastHashJoin, + # in order to have Catalyst choose the more efficient BroadcastHashJoin, # rather than the costly SortMergeJoin. train = df.join(F.broadcast(train_nums), on="slide_num") val = df.join(F.broadcast(val_nums), on="slide_num") - + if debug: # DEBUG: Sanity checks. assert len(pd.merge(train_nums_df, val_nums_df, on="slide_num")) == 0 @@ -506,14 +645,14 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic print(train.count(), val.count()) # - Check physical plans for broadcast join. print(train.explain(), val.explain()) - + # Add row indices for use with SystemML. if add_row_indices: train = (train.rdd .zipWithIndex() .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample'])) - train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"), + train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"), train.tumor_score.astype("int"), train.molecular_score, train["sample"]) val = (val.rdd @@ -531,7 +670,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic def save(df, filepath, sample_size, grayscale, mode="error", format="parquet", file_size=128): """ Save a preprocessed DataFrame with a constraint on the file sizes. - + Args: df: A Spark DataFrame. filepath: Hadoop-supported path at which to save `df`.
