http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/algorithms/KM.dml ---------------------------------------------------------------------- diff --git a/scripts/algorithms/KM.dml b/scripts/algorithms/KM.dml index ae5d5dd..fbfa917 100644 --- a/scripts/algorithms/KM.dml +++ b/scripts/algorithms/KM.dml @@ -1,619 +1,619 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -# -# THIS SCRIPT ANALIZES SURVIVAL DATA USING KAPLAN-MEIER ESTIMATES -# -# INPUT PARAMETERS: -# --------------------------------------------------------------------------------------------- -# NAME TYPE DEFAULT MEANING -# --------------------------------------------------------------------------------------------- -# X String --- Location to read the input matrix X containing the survival data: -# timestamps, whether event occurred (1) or data is censored (0), and a number of factors (categorical features) -# for grouping and/or stratifying -# TE String --- Column indices of X which contain timestamps (first entry) and event information (second entry) -# GI String --- Column indices of X corresponding to the factors to be used for grouping -# SI String --- Column indices of X corresponding to the factors to be used for stratifying -# O String --- Location to write the matrix containing the results of the Kaplan-Meier analysis; see below for the description -# M String --- Location to write Matrix M containing the following statistic: total number of events, median and its confidence intervals; -# if survival data for multiple groups and strata are provided each row of M contains the above statistics per group and stratum -# T String " " If survival data from multiple groups available and ttype=log-rank or wilcoxon, -# location to write the matrix containing result of the (stratified) test for comparing multiple groups -# alpha Double 0.05 Parameter to compute 100*(1-alpha)% confidence intervals for the survivor function and its median -# etype String "greenwood" Parameter to specify the error type according to "greenwood" (the default) or "peto" -# ctype String "log" Parameter to modify the confidence interval; "plain" keeps the lower and upper bound of -# the confidence interval unmodified, "log" (the default) corresponds to logistic transformation and -# "log-log" corresponds to the complementary log-log transformation -# ttype String "none" If survival data for multiple groups is available specifies which test to perform for comparing -# survival data across multiple groups: "none" (the default) "log-rank" or "wilcoxon" test -# fmt String "text" The output format of results of the Kaplan-Meier analysis, such as "text" or "csv" -# --------------------------------------------------------------------------------------------- -# OUTPUT: -# 1- Matrix KM whose dimension depends on the number of groups (denoted by g) and strata (denoted by s) in the data: -# each collection of 7 consecutive columns in KM corresponds to a unique combination of groups and strata in the data with the following schema -# 1. col: timestamp -# 2. col: no. at risk -# 3. col: no. of events -# 4. col: Kaplan-Meier estimate of survivor function surv -# 5. col: standard error of surv -# 6. col: lower 100*(1-alpha)% confidence interval for surv -# 7. col: upper 100*(1-alpha)% confidence interval for surv -# 2- Matrix M whose dimension depends on the number of groups (g) and strata (s) in the data (k denotes the number of factors used for grouping -# ,i.e., ncol(GI) and l denotes the number of factors used for stratifying, i.e., ncol(SI)) -# M[,1:k]: unique combination of values in the k factors used for grouping -# M[,(k+1):(k+l)]: unique combination of values in the l factors used for stratifying -# M[,k+l+1]: total number of records -# M[,k+l+2]: total number of events -# M[,k+l+3]: median of surv -# M[,k+l+4]: lower 100*(1-alpha)% confidence interval of the median of surv -# M[,k+l+5]: upper 100*(1-alpha)% confidence interval of the median of surv -# If the number of groups and strata is equal to 1, M will have 4 columns with -# M[,1]: total number of events -# M[,2]: median of surv -# M[,3]: lower 100*(1-alpha)% confidence interval of the median of surv -# M[,4]: upper 100*(1-alpha)% confidence interval of the median of surv -# 3- If survival data from multiple groups available and ttype=log-rank or wilcoxon, a 1 x 4 matrix T and an g x 5 matrix T_GROUPS_OE with -# T_GROUPS_OE[,1] = no. of events -# T_GROUPS_OE[,2] = observed value (O) -# T_GROUPS_OE[,3] = expected value (E) -# T_GROUPS_OE[,4] = (O-E)^2/E -# T_GROUPS_OE[,5] = (O-E)^2/V -# T[1,1] = no. of groups -# T[1,2] = degree of freedom for Chi-squared distributed test statistic -# T[1,3] = test statistic -# T[1,4] = P-value -# ------------------------------------------------------------------------------------------- -# HOW TO INVOKE THIS SCRIPT - EXAMPLE: -# hadoop jar SystemML.jar -f KM.dml -nvargs X=INPUT_DIR/X TE=INPUT_DIR/TE GI=INPUT_DIR/GI SI=INPUT_DIR/SI O=OUTPUT_DIR/O -# M=OUTPUT_DIR/M T=OUTPUT_DIR/T alpha=0.05 etype=greenwood ctype=log fmt=csv - -fileX = $X; -fileTE = $TE; -fileGI = ifdef ($GI, " "); -fileSI = ifdef ($SI, " "); -fileO = $O; -fileM = $M; - -# Default values of some parameters -fileT = ifdef ($T, " "); # $T=" " - -fileG = ifdef ($G, " "); # $G=" " -fileS = ifdef ($S, " "); # $S=" " -fmtO = ifdef ($fmt, "text"); # $fmt="text" -alpha = ifdef ($alpha, 0.05); # $alpha=0.05 -err_type = ifdef ($etype, "greenwood"); # $etype="greenwood" -conf_type = ifdef ($ctype, "log"); # $ctype="log" -test_type = ifdef ($ttype, "none"); # $ttype="none" - -X = read (fileX); -TE = read (fileTE); -if (fileGI != " ") { - GI = read (fileGI); -} else { - GI = matrix (0, rows = 1, cols = 1); -} - -if (fileSI != " "){ - SI = read (fileSI); -} else { - SI = matrix (0, rows = 1, cols = 1); -} - -TE = t(TE); -GI = t(GI); -SI = t(SI); - -# check arguments for validity -if (err_type != "greenwood" & err_type != "peto") { - stop (err_type + " is not a valid error type!"); -} - -if (conf_type != "plain" & conf_type != "log" & conf_type != "log-log") { - stop (conf_type + " is not a valid confidence type!"); -} - -if (test_type != "log-rank" & test_type != "wilcoxon" & test_type != "none") { - stop (test_type + " is not a valid test type!"); -} - -n_group_cols = ncol (GI); -n_stratum_cols = ncol (SI); - -# check GI and SI for validity -GI_1_1 = as.scalar (GI[1,1]); -SI_1_1 = as.scalar (SI[1,1]); -if (n_group_cols == 1) { - if (GI_1_1 == 0) { # no factors for grouping - n_group_cols = 0; - } -} else if (GI_1_1 == 0) { - stop ("Matrix GI contains zero entries!"); -} -if (n_stratum_cols == 1) { - if (SI_1_1 == 0) { # no factors for stratifying - n_stratum_cols = 0; - } -} else if (SI_1_1 == 0) { - stop ("Matrix SI contains zero entries!"); -} - -if (2 + n_group_cols + n_stratum_cols > ncol (X)) { - stop ("X has an incorrect number of columns!"); -} - -# reorder cols of X -if (GI_1_1 == 0 & SI_1_1 == 0) { - Is = TE; -} else if (GI_1_1 == 0) { - Is = append (TE, SI); -} else if (SI_1_1 == 0) { - Is = append (TE, GI); -} else { - Is = append (TE, append (GI, SI)); -} -X = X %*% table (Is, seq (1, 2 + n_group_cols + n_stratum_cols), ncol (X), 2 + n_group_cols + n_stratum_cols); - -num_records = nrow (X); -num_groups = 1; -num_strata = 1; - -### compute group id for each record -print ("Perform grouping..."); -if (n_group_cols > 0) { - for (g in 1:n_group_cols) { # sort columns corresponding to groups sequentially - X = order (target = X, by = 2 + g); - } - XG = X[,3:(3 + n_group_cols - 1)]; - Idx = matrix (1, rows = num_records, cols = 1); - Idx[2:num_records,] = rowMaxs (ppred (X[1:(num_records - 1),3:(2 + n_group_cols)], X[2:num_records,3:(2 + n_group_cols)], "!=")); - num_groups = sum (Idx); - - XG = replace (target = XG, pattern = 0, replacement = "Infinity"); - XG = XG * Idx; - XG = replace (target = XG, pattern = "NaN", replacement = 0); - G_cols = removeEmpty (target = XG, margin = "rows"); - G_cols = replace (target = G_cols, pattern = "Infinity", replacement = 0); - - A = removeEmpty (target = diag (Idx), margin = "cols"); - if (ncol (A) > 1) { - A[,1:(ncol (A) - 1)] = A[,1:(ncol (A) - 1)] - A[,2:ncol (A)]; - B = cumsum (A); - Gi = B %*% seq(1, ncol(B)); # group ids - } else { # there is only one group - Gi = matrix (1, rows = num_records, cols = 1); - } - if (n_stratum_cols > 0) { - X = append (append (X[,1:2],Gi), X[,(3 + g):ncol (X)]); - } else { # no strata - X = append (X[,1:2],Gi); - } -} - -### compute stratum id for each record -print ("Perform stratifying..."); -if (n_stratum_cols > 0) { - s_offset = 2; - if (n_group_cols > 0) { - s_offset = 3; - } - for (s in 1:n_stratum_cols) { # sort columns corresponding to strata sequentially - X = order (target = X, by = s_offset + s); - } - XS = X[,(s_offset + 1):(s_offset + n_stratum_cols)]; - Idx = matrix (1, rows = num_records, cols = 1); - Idx[2:num_records,] = rowMaxs (ppred (X[1:(num_records - 1),(s_offset + 1):(s_offset + n_stratum_cols)], X[2:num_records,(s_offset + 1):(s_offset + n_stratum_cols)], "!=")); - num_strata = sum (Idx); - - XS = replace (target = XS, pattern = 0, replacement = "Infinity"); - XS = XS * Idx; - XS = replace (target = XS, pattern = "NaN", replacement = 0); - S_cols = removeEmpty (target = XS, margin = "rows"); - S_cols = replace (target = S_cols, pattern = "Infinity", replacement = 0); - - SB = removeEmpty (target = seq (1,num_records), margin = "rows", select = Idx); # indices of stratum boundaries - A = removeEmpty (target = diag (Idx), margin = "cols"); - if (ncol (A) > 1) { - A[,1:(ncol (A) - 1)] = A[,1:(ncol (A) - 1)] - A[,2:ncol (A)]; - B = cumsum (A); - Si = B %*% seq(1, ncol(B)); # stratum ids - } else { # there is only one stratum - Si = matrix (1, rows = num_records, cols = 1); - } - X = append (X[,1:3],Si); -} - -if (n_group_cols == 0 & n_stratum_cols == 0) { - X = append (X, matrix (1, rows = num_records, cols = 2)); - SB = matrix (1, rows = 1, cols = 1); -} else if (n_group_cols == 0) { - X = append (X[,1:2], append (matrix (1, rows = num_records, cols = 1), X[,3])); -} else if (n_stratum_cols == 0) { - X = append (X, matrix (1, rows = num_records, cols = 1)); - SB = matrix (1, rows = 1, cols = 1); -} - -######## BEGIN KAPLAN-MEIER ANALYSIS -print ("BEGIN KAPLAN-MEIER SURVIVAL FIT SCRIPT"); - -KM = matrix (0, rows = num_records, cols = num_groups * num_strata * 7); -KM_cols_select = matrix (1, rows = num_groups * num_strata * 7, cols = 1); -GSI = matrix (0, rows = num_groups * num_strata, cols = 2); -a = 1/0; -M = matrix (a, rows = num_groups * num_strata, cols = 5); -M_cols = seq (1, num_groups * num_strata); -z_alpha_2 = icdf (target = 1 - alpha / 2, dist = "normal"); - -if (num_groups > 1 & test_type != "none") { - str = ""; - TEST = matrix (0, rows = num_groups, cols = 5); - TEST_GROUPS_OE = matrix (0, rows = 1, cols = 4); - U = matrix (0, rows = num_groups, cols = num_strata); - U_OE = matrix (0, rows = num_groups, cols = num_strata); - OBS = matrix (0, rows = num_groups, cols = num_strata); - EXP = matrix (0, rows = num_groups, cols = num_strata); - V_sum_total = matrix (0, rows = num_groups-1, cols = (num_groups-1) * num_strata); - n_event_all_global = matrix(1, rows=num_groups, cols=num_strata); -} else if (num_groups == 1 & test_type != "none") { - stop ("Data contains only one group or no groups, at least two groups are required for test!"); -} - -parfor (s in 1:num_strata, check = 0) { - - start_ind = as.scalar (SB[s,]); - end_ind = num_records; - if (s != num_strata) { - end_ind = as.scalar (SB[s + 1,]) - 1; - } - - ######## RECODING TIMESTAMPS PRESERVING THE ORDER - - X_cur = X[start_ind:end_ind,]; - range = end_ind - start_ind + 1; - X_cur = order (target = X_cur, by = 1); - Idx1 = matrix (1, rows = range, cols = 1); - - num_timestamps = 1; - if (range == 1) { - RT = matrix (1, rows = 1, cols = 1); - } else { - Idx1[2:range,1] = ppred (X_cur[1:(range - 1),1], X_cur[2:range,1], "!="); - num_timestamps = sum (Idx1); - A1 = removeEmpty (target = diag (Idx1), margin = "cols"); - if (ncol (A1) > 1) { - A1[,1:(ncol (A1) - 1)] = A1[,1:(ncol (A1) - 1)] - A1[,2:ncol (A1)]; - B1 = cumsum (A1); - RT = B1 %*% seq(1, ncol(B1)); - } else { # there is only one group - RT = matrix (1, rows = range, cols = 1); - } - } - - T = X_cur[,1]; - E = X_cur[,2]; - G = X_cur[,3]; - S = X_cur[,4]; - - n_event_stratum = aggregate (target = E, groups = RT, fn = "sum"); # no. of uncensored events per stratum - n_event_all_stratum = aggregate (target = E, groups = RT, fn = "count"); # no. both censored and uncensored of events per stratum - Idx1 = cumsum (n_event_all_stratum); - time_stratum = table (seq (1, nrow (Idx1), 1), Idx1) %*% T; # distinct timestamps both censored and uncensored per stratum - time_stratum_has_zero = sum (ppred (time_stratum, 0, "==")) > 0; - if (time_stratum_has_zero) { - time_stratum = replace (target = time_stratum, pattern = 0, replacement = "Infinity"); - } - n_time_all1 = nrow (n_event_stratum); # no. of distinct timestamps both censored and uncensored per stratum - n_event_all_stratum_agg = matrix (0, rows = n_time_all1, cols = 1); - if (n_time_all1 > 1) { - n_event_all_stratum_agg[2:n_time_all1,] = Idx1[1:(n_time_all1 - 1),]; - } - n_risk_stratum = range - n_event_all_stratum_agg; # no. at risk per stratum - - if (num_groups > 1 & test_type != "none") { # needed for log-rank or wilcoxon test - n_risk_n_event_stratum = matrix (0, rows = n_time_all1, cols = num_groups * 2); - } - - parfor (g in 1:num_groups, check = 0) { - - group_ind = ppred (G, g, "=="); - KM_offset = (s - 1) * num_groups * 7 + (g - 1) * 7; - M_offset = (s - 1) * num_groups + g; - if (sum (group_ind) != 0) { # group g is present in the stratum s - - GSI_offset = (s - 1) * num_groups + g; - GSI[GSI_offset,1] = g; - GSI[GSI_offset,2] = s; - E_cur = E * group_ind; - - ######## COMPUTE NO. AT RISK AND NO.OF EVENTS FOR EACH TIMESTAMP - - n_event = aggregate (target = E_cur, groups = RT, fn = "sum"); # no. of uncensored events per stratum per group - n_event_all = aggregate (target = group_ind, groups = RT, fn = "sum"); # no. of both censored and uncensored events per stratum per group - Idx1 = cumsum (n_event_all); - event_occurred = ppred (n_event, 0, ">"); - if (time_stratum_has_zero) { - time = replace (target = time_stratum * event_occurred, pattern = "NaN", replacement = 0); - time = removeEmpty (target = time, margin = "rows"); - time = replace (target = time, pattern = "Infinity", replacement = 0); - } else { - time = removeEmpty (target = time_stratum * event_occurred, margin = "rows"); - } - n_time_all2 = nrow (n_event); # no. of distinct timestamps both censored and uncensored per stratum per group - n_event_all_agg = matrix (0, rows = n_time_all2, cols = 1); - if (n_time_all2 > 1) { - n_event_all_agg[2:n_time_all2,] = Idx1[1:(n_time_all2 - 1),]; - } - - n_risk = sum (group_ind) - n_event_all_agg; # no. at risk per stratum per group - - if (num_groups > 1 & test_type != "none") { - n_risk_n_event_stratum[,(g - 1) * 2 + 1] = n_risk; - n_risk_n_event_stratum[,(g - 1) * 2 + 2] = n_event; - } - - # Extract only rows corresponding to events, i.e., for which n_event is nonzero - Idx1 = ppred (n_event, 0, "!="); - KM_1 = matrix (0, rows = n_time_all2, cols = 2); - KM_1[,1] = n_risk; - KM_1[,2] = n_event; - KM_1 = removeEmpty (target = KM_1, margin = "rows", select = Idx1); - n_risk = KM_1[,1]; - n_event = KM_1[,2]; - n_time = nrow (time); - - ######## ESTIMATE SERVIVOR FUNCTION SURV, ITS STANDARD ERROR SE_SURV, AND ITS 100(1-ALPHA)% CONFIDENCE INTERVAL - surv = cumprod ((n_risk - n_event) / n_risk); - tmp = n_event / (n_risk * (n_risk - n_event)); - se_surv = sqrt (cumsum (tmp)) * surv; - if (err_type == "peto") { - se_surv = (surv * sqrt(1 - surv) / sqrt(n_risk)); - } - - if (conf_type == "plain") { - # True survivor function is in [surv +- z_alpha_2 * se_surv], - # values less than 0 are replaced by 0, values larger than 1are replaced by 1! - CI_l = max (surv - (z_alpha_2 * se_surv), 0); - CI_r = min (surv + (z_alpha_2 * se_surv), 1); - } else if (conf_type == "log") { - # True survivor function is in [surv * exp(+- z_alpha_2 * se_surv / surv)] - CI_l = max (surv * exp (- z_alpha_2 * se_surv / surv), 0); - CI_r = min (surv * exp ( z_alpha_2 * se_surv / surv), 1); - } else { # conf_type == "log-log" - # True survivor function is in [surv ^ exp(+- z_alpha_2 * se(log(-log(surv))))] - CI_l = max (surv ^ exp (- z_alpha_2 * se_surv / log(surv)), 0); - CI_r = min (surv ^ exp ( z_alpha_2 * se_surv / log(surv)), 1); - } - # - if (as.scalar (n_risk[n_time,]) == as.scalar (n_event[n_time,])) { - CI_l[n_time,] = 0/0; - CI_r[n_time,] = 0/0; - } - - n_event_sum = sum (n_event); - n_event_sum_all = sum(n_event_all); - if (n_event_sum > 0) { - # KM_offset = (s - 1) * num_groups * 7 + (g - 1) * 7; - KM[1:n_time,KM_offset + 1] = time; - KM[1:n_time,KM_offset + 2] = n_risk; - KM[1:n_time,KM_offset + 3] = n_event; - KM[1:n_time,KM_offset + 4] = surv; - KM[1:n_time,KM_offset + 5] = se_surv; - KM[1:n_time,KM_offset + 6] = CI_l; - KM[1:n_time,KM_offset + 7] = CI_r; - } - - ######## ESTIMATE MEDIAN OF SERVIVAL TIMES AND ITS 100(1-ALPHA)% CONFIDENCE INTERVAL - - p_5 = ppred (surv, 0.5, "<="); - pn_5 = sum (p_5); - #M_offset = (s - 1) * num_groups + g; - # if the estimated survivor function is larger than 0.5 for all timestamps median does not exist! - p_5_exists = (pn_5 != 0); - M[M_offset,2] = n_event_sum; - M[M_offset,1] = n_event_sum_all; - if (p_5_exists) { - if ( as.scalar (surv[n_time - pn_5 + 1,1]) == 0.5 ) { # if the estimated survivor function is exactly equal to 0.5 - if (pn_5 > 1) { - t_5 = as.scalar ((time[n_time - pn_5 + 1,1] + time[n_time - pn_5 + 2,1])/2); - } else { - t_5 = as.scalar (time[n_time - pn_5 + 1,1]); - } - } else { - t_5 = as.scalar (time[n_time - pn_5 + 1,1]); - } - - l_ind = ppred (CI_l, 0.5, "<="); - r_ind = ppred (CI_r, 0.5, "<="); - l_ind_sum = sum (l_ind); - r_ind_sum = sum (r_ind); - l_min_ind = as.scalar (rowIndexMin (t(l_ind))); - r_min_ind = as.scalar (rowIndexMin (t(r_ind))); - if (l_min_ind == n_time) { - if (l_ind_sum > 0) { - if (as.scalar (l_ind[n_time,1]) == 0) { # NA at last position - M[M_offset,4] = time[n_time - l_ind_sum,1]; - } else { - M[M_offset,4] = time[1,1]; - } - } - } else { - M[M_offset,4] = time[l_min_ind + 1,1]; - } - # - if (r_min_ind == n_time) { - if (r_ind_sum > 0) { - if (as.scalar (r_ind[n_time,1]) == 0) { # NA at last position - M[M_offset,5] = time[n_time - r_ind_sum,1]; - } else { - M[M_offset,5] = time[1,1]; - } - } - } else { - M[M_offset,5] = time[r_min_ind + 1,1]; - } - M[M_offset,3] = t_5; - if (test_type != "none"){ - n_event_all_global[g,s] = n_event_sum_all; - } - } - } else { - print ("group " + g + " is not present in the stratum " + s); - KM_cols_select[(KM_offset + 1):(KM_offset + 7),1] = matrix (0, rows = 7, cols = 1); - M_cols[M_offset,1] = 0; - } - } - - - ######## COMPARISON BETWEEN DIFFERENT GROUPS USING LOG-RANK OR WILCOXON TEST - - if (num_groups > 1 & test_type != "none") { - - V = matrix (0, rows = num_groups-1, cols = num_groups-1); - parfor (g in 0:(num_groups-1), check = 0) { - - n_risk = n_risk_n_event_stratum[,g * 2 + 1]; - n_event = n_risk_n_event_stratum[,g * 2 + 2]; - - if (test_type == "log-rank") { - O = n_event; - E = n_risk * n_event_stratum / n_risk_stratum; - } else { ### test_type == "wilcoxon" - O = n_risk_stratum * n_event / range; - E = n_risk * n_event_stratum / range; - } - U[(g + 1),s] = sum (O - E); - U_OE[g + 1, s] = (sum (O - E)*sum (O - E))/sum(E); - OBS[g + 1, s] = sum(O); - EXP[g + 1, s] = sum(E); - } - - # parfor (i1 in 0:(num_groups - 2), check = 0) { - for (i1 in 0:(num_groups - 2), check = 0) { - - n_risk = n_risk_n_event_stratum[,1 + i1 * 2]; - n_event = n_risk_n_event_stratum[,2 + i1 * 2]; - for (i2 in 0:(num_groups - 2)) { - - n_risk_i2j = n_risk_n_event_stratum[,1 + i2 * 2]; - I_i1i2 = 0; - if (i1 == i2) { - I_i1i2 = 1; - } - if (test_type == "log-rank") { - V1 = n_risk * n_event_stratum * (n_risk_stratum - n_event_stratum) / (n_risk_stratum * (n_risk_stratum - 1)); - V1 = replace (target = V1, pattern = "NaN", replacement = 0); - V2 = I_i1i2 - (n_risk_i2j / n_risk_stratum); - V[(i1 + 1),(i2 + 1)] = sum (V1 * V2); - } else { ### test_type == "wilcoxon" - V1 = (n_risk_stratum ^ 2) * (n_risk * n_event_stratum) * (n_risk_stratum - n_event_stratum) / (n_risk_stratum * (n_risk_stratum - 1)); - V1 = replace (target = V1, pattern = "NaN", replacement = 0); - V2 = I_i1i2 - (n_risk_i2j / n_risk_stratum); - V[(i1 + 1),(i2 + 1)] = sum (V1 * V2) / (range ^ 2); - } - } - } - V_start_ind = (s - 1) * (num_groups - 1) + 1; - V_sum_total[,V_start_ind:(V_start_ind + num_groups - 2)] = V; - } -} - -if (num_groups > 1 & test_type != "none") { - V_sum = matrix (0, rows = num_groups-1, cols = num_groups-1); - for (s in 1:num_strata) { - V_start_ind = (s - 1) * (num_groups - 1) + 1; - V_sum_total_part = V_sum_total[,V_start_ind:(V_start_ind + num_groups - 2)]; - V_sum = V_sum + V_sum_total_part; - } - - U_sum = rowSums (U); - - test_st = as.scalar (t(U_sum[1:(num_groups-1),1]) %*% inv(V_sum) %*% U_sum[1:(num_groups-1),1]); - p_val = 1 - cdf (target = test_st, dist = "chisq", df = num_groups-1 ); - if (test_type != "none") { - U_OE_sum = rowSums(U_OE); - V_OE =rowSums((U*U) /sum(V_sum)); - TEST_GROUPS_OE[1,1] = num_groups; - TEST_GROUPS_OE[1,2] = num_groups - 1; - TEST_GROUPS_OE[1,3] = test_st; - TEST_GROUPS_OE[1,4] = p_val; - TEST[,1] = rowSums(n_event_all_global); - TEST[,2] = rowSums(OBS); - TEST[,3] = rowSums(EXP); - TEST[,4] = rowSums(U_OE_sum); - TEST[,5] = rowSums(V_OE); - str = append (str, test_type + " test for " + num_groups + " groups: Chi-squared = " + test_st + " on " + (num_groups - 1) + " df, p = " + p_val + " "); - } -} - -GSI = removeEmpty (target = GSI, margin = "rows"); -if (n_group_cols > 0) { - # making a copy of unique groups before adding new rows depending on strata - G_cols_original = G_cols; - - GSI_1 = GSI[,1]; - tab_G = table (seq (1, nrow (GSI_1)), GSI_1, nrow (GSI_1), nrow (G_cols)); - G_cols = tab_G %*% G_cols; -} - -if (n_stratum_cols > 0) { - GSI_2 = GSI[,2]; - tab_S = table (seq (1, nrow (GSI_2)), GSI_2, nrow (GSI_2), nrow (S_cols)); - S_cols = tab_S %*% S_cols; -} - -# pull out non-empty rows from M -M_cols = removeEmpty (target = M_cols, margin = "rows"); -tab_M = table (seq (1, nrow (M_cols)), M_cols, nrow (M_cols), nrow (M)); -M = tab_M %*% M; -M = replace (target = M, pattern = "Infinity", replacement = "NaN"); - -# pull out non-empty rows from TEST -if (n_group_cols > 0 & n_stratum_cols > 0) { - M = append (append (G_cols, S_cols), M); - if (test_type != "none") { - TEST = append (G_cols_original, TEST); - } -} else if (n_group_cols > 0) { - M = append (G_cols, M); - if (test_type != "none") { - TEST = append (G_cols_original, TEST); - } -} else if (n_stratum_cols > 0) { - M = append (S_cols, M); -} - -# pull out non-empty columns from KM -KM = t (append (t (KM), KM_cols_select) * KM_cols_select); -KM = removeEmpty (target = KM, margin = "cols"); -KM = removeEmpty (target = KM, margin = "rows"); -KM = KM[1:(nrow (KM) - 1),]; - -# write output matrices -write (M, fileM, format=fmtO); -write (KM, fileO, format=fmtO); - -if (test_type != "none") { - if (num_groups > 1 & fileT != " ") { - write (TEST, fileT, format=fmtO); - write (TEST_GROUPS_OE, fileT+".groups.oe", format=fmtO); - } else { - print (str); - } +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# +# THIS SCRIPT ANALIZES SURVIVAL DATA USING KAPLAN-MEIER ESTIMATES +# +# INPUT PARAMETERS: +# --------------------------------------------------------------------------------------------- +# NAME TYPE DEFAULT MEANING +# --------------------------------------------------------------------------------------------- +# X String --- Location to read the input matrix X containing the survival data: +# timestamps, whether event occurred (1) or data is censored (0), and a number of factors (categorical features) +# for grouping and/or stratifying +# TE String --- Column indices of X which contain timestamps (first entry) and event information (second entry) +# GI String --- Column indices of X corresponding to the factors to be used for grouping +# SI String --- Column indices of X corresponding to the factors to be used for stratifying +# O String --- Location to write the matrix containing the results of the Kaplan-Meier analysis; see below for the description +# M String --- Location to write Matrix M containing the following statistic: total number of events, median and its confidence intervals; +# if survival data for multiple groups and strata are provided each row of M contains the above statistics per group and stratum +# T String " " If survival data from multiple groups available and ttype=log-rank or wilcoxon, +# location to write the matrix containing result of the (stratified) test for comparing multiple groups +# alpha Double 0.05 Parameter to compute 100*(1-alpha)% confidence intervals for the survivor function and its median +# etype String "greenwood" Parameter to specify the error type according to "greenwood" (the default) or "peto" +# ctype String "log" Parameter to modify the confidence interval; "plain" keeps the lower and upper bound of +# the confidence interval unmodified, "log" (the default) corresponds to logistic transformation and +# "log-log" corresponds to the complementary log-log transformation +# ttype String "none" If survival data for multiple groups is available specifies which test to perform for comparing +# survival data across multiple groups: "none" (the default) "log-rank" or "wilcoxon" test +# fmt String "text" The output format of results of the Kaplan-Meier analysis, such as "text" or "csv" +# --------------------------------------------------------------------------------------------- +# OUTPUT: +# 1- Matrix KM whose dimension depends on the number of groups (denoted by g) and strata (denoted by s) in the data: +# each collection of 7 consecutive columns in KM corresponds to a unique combination of groups and strata in the data with the following schema +# 1. col: timestamp +# 2. col: no. at risk +# 3. col: no. of events +# 4. col: Kaplan-Meier estimate of survivor function surv +# 5. col: standard error of surv +# 6. col: lower 100*(1-alpha)% confidence interval for surv +# 7. col: upper 100*(1-alpha)% confidence interval for surv +# 2- Matrix M whose dimension depends on the number of groups (g) and strata (s) in the data (k denotes the number of factors used for grouping +# ,i.e., ncol(GI) and l denotes the number of factors used for stratifying, i.e., ncol(SI)) +# M[,1:k]: unique combination of values in the k factors used for grouping +# M[,(k+1):(k+l)]: unique combination of values in the l factors used for stratifying +# M[,k+l+1]: total number of records +# M[,k+l+2]: total number of events +# M[,k+l+3]: median of surv +# M[,k+l+4]: lower 100*(1-alpha)% confidence interval of the median of surv +# M[,k+l+5]: upper 100*(1-alpha)% confidence interval of the median of surv +# If the number of groups and strata is equal to 1, M will have 4 columns with +# M[,1]: total number of events +# M[,2]: median of surv +# M[,3]: lower 100*(1-alpha)% confidence interval of the median of surv +# M[,4]: upper 100*(1-alpha)% confidence interval of the median of surv +# 3- If survival data from multiple groups available and ttype=log-rank or wilcoxon, a 1 x 4 matrix T and an g x 5 matrix T_GROUPS_OE with +# T_GROUPS_OE[,1] = no. of events +# T_GROUPS_OE[,2] = observed value (O) +# T_GROUPS_OE[,3] = expected value (E) +# T_GROUPS_OE[,4] = (O-E)^2/E +# T_GROUPS_OE[,5] = (O-E)^2/V +# T[1,1] = no. of groups +# T[1,2] = degree of freedom for Chi-squared distributed test statistic +# T[1,3] = test statistic +# T[1,4] = P-value +# ------------------------------------------------------------------------------------------- +# HOW TO INVOKE THIS SCRIPT - EXAMPLE: +# hadoop jar SystemML.jar -f KM.dml -nvargs X=INPUT_DIR/X TE=INPUT_DIR/TE GI=INPUT_DIR/GI SI=INPUT_DIR/SI O=OUTPUT_DIR/O +# M=OUTPUT_DIR/M T=OUTPUT_DIR/T alpha=0.05 etype=greenwood ctype=log fmt=csv + +fileX = $X; +fileTE = $TE; +fileGI = ifdef ($GI, " "); +fileSI = ifdef ($SI, " "); +fileO = $O; +fileM = $M; + +# Default values of some parameters +fileT = ifdef ($T, " "); # $T=" " + +fileG = ifdef ($G, " "); # $G=" " +fileS = ifdef ($S, " "); # $S=" " +fmtO = ifdef ($fmt, "text"); # $fmt="text" +alpha = ifdef ($alpha, 0.05); # $alpha=0.05 +err_type = ifdef ($etype, "greenwood"); # $etype="greenwood" +conf_type = ifdef ($ctype, "log"); # $ctype="log" +test_type = ifdef ($ttype, "none"); # $ttype="none" + +X = read (fileX); +TE = read (fileTE); +if (fileGI != " ") { + GI = read (fileGI); +} else { + GI = matrix (0, rows = 1, cols = 1); +} + +if (fileSI != " "){ + SI = read (fileSI); +} else { + SI = matrix (0, rows = 1, cols = 1); +} + +TE = t(TE); +GI = t(GI); +SI = t(SI); + +# check arguments for validity +if (err_type != "greenwood" & err_type != "peto") { + stop (err_type + " is not a valid error type!"); +} + +if (conf_type != "plain" & conf_type != "log" & conf_type != "log-log") { + stop (conf_type + " is not a valid confidence type!"); +} + +if (test_type != "log-rank" & test_type != "wilcoxon" & test_type != "none") { + stop (test_type + " is not a valid test type!"); +} + +n_group_cols = ncol (GI); +n_stratum_cols = ncol (SI); + +# check GI and SI for validity +GI_1_1 = as.scalar (GI[1,1]); +SI_1_1 = as.scalar (SI[1,1]); +if (n_group_cols == 1) { + if (GI_1_1 == 0) { # no factors for grouping + n_group_cols = 0; + } +} else if (GI_1_1 == 0) { + stop ("Matrix GI contains zero entries!"); +} +if (n_stratum_cols == 1) { + if (SI_1_1 == 0) { # no factors for stratifying + n_stratum_cols = 0; + } +} else if (SI_1_1 == 0) { + stop ("Matrix SI contains zero entries!"); +} + +if (2 + n_group_cols + n_stratum_cols > ncol (X)) { + stop ("X has an incorrect number of columns!"); +} + +# reorder cols of X +if (GI_1_1 == 0 & SI_1_1 == 0) { + Is = TE; +} else if (GI_1_1 == 0) { + Is = append (TE, SI); +} else if (SI_1_1 == 0) { + Is = append (TE, GI); +} else { + Is = append (TE, append (GI, SI)); +} +X = X %*% table (Is, seq (1, 2 + n_group_cols + n_stratum_cols), ncol (X), 2 + n_group_cols + n_stratum_cols); + +num_records = nrow (X); +num_groups = 1; +num_strata = 1; + +### compute group id for each record +print ("Perform grouping..."); +if (n_group_cols > 0) { + for (g in 1:n_group_cols) { # sort columns corresponding to groups sequentially + X = order (target = X, by = 2 + g); + } + XG = X[,3:(3 + n_group_cols - 1)]; + Idx = matrix (1, rows = num_records, cols = 1); + Idx[2:num_records,] = rowMaxs (ppred (X[1:(num_records - 1),3:(2 + n_group_cols)], X[2:num_records,3:(2 + n_group_cols)], "!=")); + num_groups = sum (Idx); + + XG = replace (target = XG, pattern = 0, replacement = "Infinity"); + XG = XG * Idx; + XG = replace (target = XG, pattern = "NaN", replacement = 0); + G_cols = removeEmpty (target = XG, margin = "rows"); + G_cols = replace (target = G_cols, pattern = "Infinity", replacement = 0); + + A = removeEmpty (target = diag (Idx), margin = "cols"); + if (ncol (A) > 1) { + A[,1:(ncol (A) - 1)] = A[,1:(ncol (A) - 1)] - A[,2:ncol (A)]; + B = cumsum (A); + Gi = B %*% seq(1, ncol(B)); # group ids + } else { # there is only one group + Gi = matrix (1, rows = num_records, cols = 1); + } + if (n_stratum_cols > 0) { + X = append (append (X[,1:2],Gi), X[,(3 + g):ncol (X)]); + } else { # no strata + X = append (X[,1:2],Gi); + } +} + +### compute stratum id for each record +print ("Perform stratifying..."); +if (n_stratum_cols > 0) { + s_offset = 2; + if (n_group_cols > 0) { + s_offset = 3; + } + for (s in 1:n_stratum_cols) { # sort columns corresponding to strata sequentially + X = order (target = X, by = s_offset + s); + } + XS = X[,(s_offset + 1):(s_offset + n_stratum_cols)]; + Idx = matrix (1, rows = num_records, cols = 1); + Idx[2:num_records,] = rowMaxs (ppred (X[1:(num_records - 1),(s_offset + 1):(s_offset + n_stratum_cols)], X[2:num_records,(s_offset + 1):(s_offset + n_stratum_cols)], "!=")); + num_strata = sum (Idx); + + XS = replace (target = XS, pattern = 0, replacement = "Infinity"); + XS = XS * Idx; + XS = replace (target = XS, pattern = "NaN", replacement = 0); + S_cols = removeEmpty (target = XS, margin = "rows"); + S_cols = replace (target = S_cols, pattern = "Infinity", replacement = 0); + + SB = removeEmpty (target = seq (1,num_records), margin = "rows", select = Idx); # indices of stratum boundaries + A = removeEmpty (target = diag (Idx), margin = "cols"); + if (ncol (A) > 1) { + A[,1:(ncol (A) - 1)] = A[,1:(ncol (A) - 1)] - A[,2:ncol (A)]; + B = cumsum (A); + Si = B %*% seq(1, ncol(B)); # stratum ids + } else { # there is only one stratum + Si = matrix (1, rows = num_records, cols = 1); + } + X = append (X[,1:3],Si); +} + +if (n_group_cols == 0 & n_stratum_cols == 0) { + X = append (X, matrix (1, rows = num_records, cols = 2)); + SB = matrix (1, rows = 1, cols = 1); +} else if (n_group_cols == 0) { + X = append (X[,1:2], append (matrix (1, rows = num_records, cols = 1), X[,3])); +} else if (n_stratum_cols == 0) { + X = append (X, matrix (1, rows = num_records, cols = 1)); + SB = matrix (1, rows = 1, cols = 1); +} + +######## BEGIN KAPLAN-MEIER ANALYSIS +print ("BEGIN KAPLAN-MEIER SURVIVAL FIT SCRIPT"); + +KM = matrix (0, rows = num_records, cols = num_groups * num_strata * 7); +KM_cols_select = matrix (1, rows = num_groups * num_strata * 7, cols = 1); +GSI = matrix (0, rows = num_groups * num_strata, cols = 2); +a = 1/0; +M = matrix (a, rows = num_groups * num_strata, cols = 5); +M_cols = seq (1, num_groups * num_strata); +z_alpha_2 = icdf (target = 1 - alpha / 2, dist = "normal"); + +if (num_groups > 1 & test_type != "none") { + str = ""; + TEST = matrix (0, rows = num_groups, cols = 5); + TEST_GROUPS_OE = matrix (0, rows = 1, cols = 4); + U = matrix (0, rows = num_groups, cols = num_strata); + U_OE = matrix (0, rows = num_groups, cols = num_strata); + OBS = matrix (0, rows = num_groups, cols = num_strata); + EXP = matrix (0, rows = num_groups, cols = num_strata); + V_sum_total = matrix (0, rows = num_groups-1, cols = (num_groups-1) * num_strata); + n_event_all_global = matrix(1, rows=num_groups, cols=num_strata); +} else if (num_groups == 1 & test_type != "none") { + stop ("Data contains only one group or no groups, at least two groups are required for test!"); +} + +parfor (s in 1:num_strata, check = 0) { + + start_ind = as.scalar (SB[s,]); + end_ind = num_records; + if (s != num_strata) { + end_ind = as.scalar (SB[s + 1,]) - 1; + } + + ######## RECODING TIMESTAMPS PRESERVING THE ORDER + + X_cur = X[start_ind:end_ind,]; + range = end_ind - start_ind + 1; + X_cur = order (target = X_cur, by = 1); + Idx1 = matrix (1, rows = range, cols = 1); + + num_timestamps = 1; + if (range == 1) { + RT = matrix (1, rows = 1, cols = 1); + } else { + Idx1[2:range,1] = ppred (X_cur[1:(range - 1),1], X_cur[2:range,1], "!="); + num_timestamps = sum (Idx1); + A1 = removeEmpty (target = diag (Idx1), margin = "cols"); + if (ncol (A1) > 1) { + A1[,1:(ncol (A1) - 1)] = A1[,1:(ncol (A1) - 1)] - A1[,2:ncol (A1)]; + B1 = cumsum (A1); + RT = B1 %*% seq(1, ncol(B1)); + } else { # there is only one group + RT = matrix (1, rows = range, cols = 1); + } + } + + T = X_cur[,1]; + E = X_cur[,2]; + G = X_cur[,3]; + S = X_cur[,4]; + + n_event_stratum = aggregate (target = E, groups = RT, fn = "sum"); # no. of uncensored events per stratum + n_event_all_stratum = aggregate (target = E, groups = RT, fn = "count"); # no. both censored and uncensored of events per stratum + Idx1 = cumsum (n_event_all_stratum); + time_stratum = table (seq (1, nrow (Idx1), 1), Idx1) %*% T; # distinct timestamps both censored and uncensored per stratum + time_stratum_has_zero = sum (ppred (time_stratum, 0, "==")) > 0; + if (time_stratum_has_zero) { + time_stratum = replace (target = time_stratum, pattern = 0, replacement = "Infinity"); + } + n_time_all1 = nrow (n_event_stratum); # no. of distinct timestamps both censored and uncensored per stratum + n_event_all_stratum_agg = matrix (0, rows = n_time_all1, cols = 1); + if (n_time_all1 > 1) { + n_event_all_stratum_agg[2:n_time_all1,] = Idx1[1:(n_time_all1 - 1),]; + } + n_risk_stratum = range - n_event_all_stratum_agg; # no. at risk per stratum + + if (num_groups > 1 & test_type != "none") { # needed for log-rank or wilcoxon test + n_risk_n_event_stratum = matrix (0, rows = n_time_all1, cols = num_groups * 2); + } + + parfor (g in 1:num_groups, check = 0) { + + group_ind = ppred (G, g, "=="); + KM_offset = (s - 1) * num_groups * 7 + (g - 1) * 7; + M_offset = (s - 1) * num_groups + g; + if (sum (group_ind) != 0) { # group g is present in the stratum s + + GSI_offset = (s - 1) * num_groups + g; + GSI[GSI_offset,1] = g; + GSI[GSI_offset,2] = s; + E_cur = E * group_ind; + + ######## COMPUTE NO. AT RISK AND NO.OF EVENTS FOR EACH TIMESTAMP + + n_event = aggregate (target = E_cur, groups = RT, fn = "sum"); # no. of uncensored events per stratum per group + n_event_all = aggregate (target = group_ind, groups = RT, fn = "sum"); # no. of both censored and uncensored events per stratum per group + Idx1 = cumsum (n_event_all); + event_occurred = ppred (n_event, 0, ">"); + if (time_stratum_has_zero) { + time = replace (target = time_stratum * event_occurred, pattern = "NaN", replacement = 0); + time = removeEmpty (target = time, margin = "rows"); + time = replace (target = time, pattern = "Infinity", replacement = 0); + } else { + time = removeEmpty (target = time_stratum * event_occurred, margin = "rows"); + } + n_time_all2 = nrow (n_event); # no. of distinct timestamps both censored and uncensored per stratum per group + n_event_all_agg = matrix (0, rows = n_time_all2, cols = 1); + if (n_time_all2 > 1) { + n_event_all_agg[2:n_time_all2,] = Idx1[1:(n_time_all2 - 1),]; + } + + n_risk = sum (group_ind) - n_event_all_agg; # no. at risk per stratum per group + + if (num_groups > 1 & test_type != "none") { + n_risk_n_event_stratum[,(g - 1) * 2 + 1] = n_risk; + n_risk_n_event_stratum[,(g - 1) * 2 + 2] = n_event; + } + + # Extract only rows corresponding to events, i.e., for which n_event is nonzero + Idx1 = ppred (n_event, 0, "!="); + KM_1 = matrix (0, rows = n_time_all2, cols = 2); + KM_1[,1] = n_risk; + KM_1[,2] = n_event; + KM_1 = removeEmpty (target = KM_1, margin = "rows", select = Idx1); + n_risk = KM_1[,1]; + n_event = KM_1[,2]; + n_time = nrow (time); + + ######## ESTIMATE SERVIVOR FUNCTION SURV, ITS STANDARD ERROR SE_SURV, AND ITS 100(1-ALPHA)% CONFIDENCE INTERVAL + surv = cumprod ((n_risk - n_event) / n_risk); + tmp = n_event / (n_risk * (n_risk - n_event)); + se_surv = sqrt (cumsum (tmp)) * surv; + if (err_type == "peto") { + se_surv = (surv * sqrt(1 - surv) / sqrt(n_risk)); + } + + if (conf_type == "plain") { + # True survivor function is in [surv +- z_alpha_2 * se_surv], + # values less than 0 are replaced by 0, values larger than 1are replaced by 1! + CI_l = max (surv - (z_alpha_2 * se_surv), 0); + CI_r = min (surv + (z_alpha_2 * se_surv), 1); + } else if (conf_type == "log") { + # True survivor function is in [surv * exp(+- z_alpha_2 * se_surv / surv)] + CI_l = max (surv * exp (- z_alpha_2 * se_surv / surv), 0); + CI_r = min (surv * exp ( z_alpha_2 * se_surv / surv), 1); + } else { # conf_type == "log-log" + # True survivor function is in [surv ^ exp(+- z_alpha_2 * se(log(-log(surv))))] + CI_l = max (surv ^ exp (- z_alpha_2 * se_surv / log(surv)), 0); + CI_r = min (surv ^ exp ( z_alpha_2 * se_surv / log(surv)), 1); + } + # + if (as.scalar (n_risk[n_time,]) == as.scalar (n_event[n_time,])) { + CI_l[n_time,] = 0/0; + CI_r[n_time,] = 0/0; + } + + n_event_sum = sum (n_event); + n_event_sum_all = sum(n_event_all); + if (n_event_sum > 0) { + # KM_offset = (s - 1) * num_groups * 7 + (g - 1) * 7; + KM[1:n_time,KM_offset + 1] = time; + KM[1:n_time,KM_offset + 2] = n_risk; + KM[1:n_time,KM_offset + 3] = n_event; + KM[1:n_time,KM_offset + 4] = surv; + KM[1:n_time,KM_offset + 5] = se_surv; + KM[1:n_time,KM_offset + 6] = CI_l; + KM[1:n_time,KM_offset + 7] = CI_r; + } + + ######## ESTIMATE MEDIAN OF SERVIVAL TIMES AND ITS 100(1-ALPHA)% CONFIDENCE INTERVAL + + p_5 = ppred (surv, 0.5, "<="); + pn_5 = sum (p_5); + #M_offset = (s - 1) * num_groups + g; + # if the estimated survivor function is larger than 0.5 for all timestamps median does not exist! + p_5_exists = (pn_5 != 0); + M[M_offset,2] = n_event_sum; + M[M_offset,1] = n_event_sum_all; + if (p_5_exists) { + if ( as.scalar (surv[n_time - pn_5 + 1,1]) == 0.5 ) { # if the estimated survivor function is exactly equal to 0.5 + if (pn_5 > 1) { + t_5 = as.scalar ((time[n_time - pn_5 + 1,1] + time[n_time - pn_5 + 2,1])/2); + } else { + t_5 = as.scalar (time[n_time - pn_5 + 1,1]); + } + } else { + t_5 = as.scalar (time[n_time - pn_5 + 1,1]); + } + + l_ind = ppred (CI_l, 0.5, "<="); + r_ind = ppred (CI_r, 0.5, "<="); + l_ind_sum = sum (l_ind); + r_ind_sum = sum (r_ind); + l_min_ind = as.scalar (rowIndexMin (t(l_ind))); + r_min_ind = as.scalar (rowIndexMin (t(r_ind))); + if (l_min_ind == n_time) { + if (l_ind_sum > 0) { + if (as.scalar (l_ind[n_time,1]) == 0) { # NA at last position + M[M_offset,4] = time[n_time - l_ind_sum,1]; + } else { + M[M_offset,4] = time[1,1]; + } + } + } else { + M[M_offset,4] = time[l_min_ind + 1,1]; + } + # + if (r_min_ind == n_time) { + if (r_ind_sum > 0) { + if (as.scalar (r_ind[n_time,1]) == 0) { # NA at last position + M[M_offset,5] = time[n_time - r_ind_sum,1]; + } else { + M[M_offset,5] = time[1,1]; + } + } + } else { + M[M_offset,5] = time[r_min_ind + 1,1]; + } + M[M_offset,3] = t_5; + if (test_type != "none"){ + n_event_all_global[g,s] = n_event_sum_all; + } + } + } else { + print ("group " + g + " is not present in the stratum " + s); + KM_cols_select[(KM_offset + 1):(KM_offset + 7),1] = matrix (0, rows = 7, cols = 1); + M_cols[M_offset,1] = 0; + } + } + + + ######## COMPARISON BETWEEN DIFFERENT GROUPS USING LOG-RANK OR WILCOXON TEST + + if (num_groups > 1 & test_type != "none") { + + V = matrix (0, rows = num_groups-1, cols = num_groups-1); + parfor (g in 0:(num_groups-1), check = 0) { + + n_risk = n_risk_n_event_stratum[,g * 2 + 1]; + n_event = n_risk_n_event_stratum[,g * 2 + 2]; + + if (test_type == "log-rank") { + O = n_event; + E = n_risk * n_event_stratum / n_risk_stratum; + } else { ### test_type == "wilcoxon" + O = n_risk_stratum * n_event / range; + E = n_risk * n_event_stratum / range; + } + U[(g + 1),s] = sum (O - E); + U_OE[g + 1, s] = (sum (O - E)*sum (O - E))/sum(E); + OBS[g + 1, s] = sum(O); + EXP[g + 1, s] = sum(E); + } + + # parfor (i1 in 0:(num_groups - 2), check = 0) { + for (i1 in 0:(num_groups - 2), check = 0) { + + n_risk = n_risk_n_event_stratum[,1 + i1 * 2]; + n_event = n_risk_n_event_stratum[,2 + i1 * 2]; + for (i2 in 0:(num_groups - 2)) { + + n_risk_i2j = n_risk_n_event_stratum[,1 + i2 * 2]; + I_i1i2 = 0; + if (i1 == i2) { + I_i1i2 = 1; + } + if (test_type == "log-rank") { + V1 = n_risk * n_event_stratum * (n_risk_stratum - n_event_stratum) / (n_risk_stratum * (n_risk_stratum - 1)); + V1 = replace (target = V1, pattern = "NaN", replacement = 0); + V2 = I_i1i2 - (n_risk_i2j / n_risk_stratum); + V[(i1 + 1),(i2 + 1)] = sum (V1 * V2); + } else { ### test_type == "wilcoxon" + V1 = (n_risk_stratum ^ 2) * (n_risk * n_event_stratum) * (n_risk_stratum - n_event_stratum) / (n_risk_stratum * (n_risk_stratum - 1)); + V1 = replace (target = V1, pattern = "NaN", replacement = 0); + V2 = I_i1i2 - (n_risk_i2j / n_risk_stratum); + V[(i1 + 1),(i2 + 1)] = sum (V1 * V2) / (range ^ 2); + } + } + } + V_start_ind = (s - 1) * (num_groups - 1) + 1; + V_sum_total[,V_start_ind:(V_start_ind + num_groups - 2)] = V; + } +} + +if (num_groups > 1 & test_type != "none") { + V_sum = matrix (0, rows = num_groups-1, cols = num_groups-1); + for (s in 1:num_strata) { + V_start_ind = (s - 1) * (num_groups - 1) + 1; + V_sum_total_part = V_sum_total[,V_start_ind:(V_start_ind + num_groups - 2)]; + V_sum = V_sum + V_sum_total_part; + } + + U_sum = rowSums (U); + + test_st = as.scalar (t(U_sum[1:(num_groups-1),1]) %*% inv(V_sum) %*% U_sum[1:(num_groups-1),1]); + p_val = 1 - cdf (target = test_st, dist = "chisq", df = num_groups-1 ); + if (test_type != "none") { + U_OE_sum = rowSums(U_OE); + V_OE =rowSums((U*U) /sum(V_sum)); + TEST_GROUPS_OE[1,1] = num_groups; + TEST_GROUPS_OE[1,2] = num_groups - 1; + TEST_GROUPS_OE[1,3] = test_st; + TEST_GROUPS_OE[1,4] = p_val; + TEST[,1] = rowSums(n_event_all_global); + TEST[,2] = rowSums(OBS); + TEST[,3] = rowSums(EXP); + TEST[,4] = rowSums(U_OE_sum); + TEST[,5] = rowSums(V_OE); + str = append (str, test_type + " test for " + num_groups + " groups: Chi-squared = " + test_st + " on " + (num_groups - 1) + " df, p = " + p_val + " "); + } +} + +GSI = removeEmpty (target = GSI, margin = "rows"); +if (n_group_cols > 0) { + # making a copy of unique groups before adding new rows depending on strata + G_cols_original = G_cols; + + GSI_1 = GSI[,1]; + tab_G = table (seq (1, nrow (GSI_1)), GSI_1, nrow (GSI_1), nrow (G_cols)); + G_cols = tab_G %*% G_cols; +} + +if (n_stratum_cols > 0) { + GSI_2 = GSI[,2]; + tab_S = table (seq (1, nrow (GSI_2)), GSI_2, nrow (GSI_2), nrow (S_cols)); + S_cols = tab_S %*% S_cols; +} + +# pull out non-empty rows from M +M_cols = removeEmpty (target = M_cols, margin = "rows"); +tab_M = table (seq (1, nrow (M_cols)), M_cols, nrow (M_cols), nrow (M)); +M = tab_M %*% M; +M = replace (target = M, pattern = "Infinity", replacement = "NaN"); + +# pull out non-empty rows from TEST +if (n_group_cols > 0 & n_stratum_cols > 0) { + M = append (append (G_cols, S_cols), M); + if (test_type != "none") { + TEST = append (G_cols_original, TEST); + } +} else if (n_group_cols > 0) { + M = append (G_cols, M); + if (test_type != "none") { + TEST = append (G_cols_original, TEST); + } +} else if (n_stratum_cols > 0) { + M = append (S_cols, M); +} + +# pull out non-empty columns from KM +KM = t (append (t (KM), KM_cols_select) * KM_cols_select); +KM = removeEmpty (target = KM, margin = "cols"); +KM = removeEmpty (target = KM, margin = "rows"); +KM = KM[1:(nrow (KM) - 1),]; + +# write output matrices +write (M, fileM, format=fmtO); +write (KM, fileO, format=fmtO); + +if (test_type != "none") { + if (num_groups > 1 & fileT != " ") { + write (TEST, fileT, format=fmtO); + write (TEST_GROUPS_OE, fileT+".groups.oe", format=fmtO); + } else { + print (str); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/algorithms/Kmeans-predict.dml ---------------------------------------------------------------------- diff --git a/scripts/algorithms/Kmeans-predict.dml b/scripts/algorithms/Kmeans-predict.dml index 8496585..5bd78bd 100644 --- a/scripts/algorithms/Kmeans-predict.dml +++ b/scripts/algorithms/Kmeans-predict.dml @@ -1,339 +1,339 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -# -# Compares two categorical data vectors (presumed to be clusterings) and -# counts matching/nonmatching same-cluster/different-cluster pairs of rows -# -# INPUT PARAMETERS: -# --------------------------------------------------------------------------- -# NAME TYPE DEFAULT MEANING -# --------------------------------------------------------------------------- -# spY String " " Location to read a column-vector with the "specified" -# assignment of records (rows) to categories (clusters) -# prY String " " Location to read (or write, if X and C are present) a -# column-vector with the "predicted" assignment of rows -# to clusters. NOTE: The same category may be labeled -# differently in each of the two vectors, spY and prY. -# fmt String "text" Matrix output format for prY, usually "text" or "csv" -# X String " " Location to read matrix X with the input data records -# C String " " Location to read matrix C with the cluster centroids -# NOTE: If X and C are present, prY is an output file. -# O String " " Location to write the printed output statistics -# --------------------------------------------------------------------------- -# -# The "O" file provides the output statistics in CSV format, one per line, in -# the following format: NAME, [CID], VALUE. Note: -# - The 1st group statistics are given if X input is available; -# - The 2nd group statistics are given if X and C inputs are available; -# - The 3rd and 4th group statistics are given if spY input is available; -# - Only the 4th group statistics contain a nonempty CID value; -# - When present, CID contains either the specified category label or the -# predicted cluster label. -# -# NAME CID MEANING -# --------------------------------------------------------------------------- -# TSS Total Sum of Squares (from the total mean) -# WCSS_M Within-Cluster Sum of Squares (means as centers) -# WCSS_M_PC Within-Cluster Sum of Squares (means), in % of TSS -# BCSS_M Between-Cluster Sum of Squares (means as centers) -# BCSS_M_PC Between-Cluster Sum of Squares (means), in % of TSS -# -# WCSS_C Within-Cluster Sum of Squares (centroids as centers) -# WCSS_C_PC Within-Cluster Sum of Squares (centroids), % of TSS -# BCSS_C Between-Cluster Sum of Squares (centroids as centers) -# BCSS_C_PC Between-Cluster Sum of Squares (centroids), % of TSS -# -# TRUE_SAME_CT Same-category pairs predicted as Same-cluster, count -# TRUE_SAME_PC Same-category pairs predicted as Same-cluster, % -# TRUE_DIFF_CT Diff-category pairs predicted as Diff-cluster, count -# TRUE_DIFF_PC Diff-category pairs predicted as Diff-cluster, % -# FALSE_SAME_CT Diff-category pairs predicted as Same-cluster, count -# FALSE_SAME_PC Diff-category pairs predicted as Same-cluster, % -# FALSE_DIFF_CT Same-category pairs predicted as Diff-cluster, count -# FALSE_DIFF_PC Same-category pairs predicted as Diff-cluster, % -# -# SPEC_TO_PRED + For specified category, the best predicted cluster id -# SPEC_FULL_CT + For specified category, its full count -# SPEC_MATCH_CT + For specified category, best-cluster matching count -# SPEC_MATCH_PC + For specified category, % of matching to full count -# PRED_TO_SPEC + For predicted cluster, the best specified category id -# PRED_FULL_CT + For predicted cluster, its full count -# PRED_MATCH_CT + For predicted cluster, best-category matching count -# PRED_MATCH_PC + For predicted cluster, % of matching to full count -# --------------------------------------------------------------------------- -# -# Examples: -# 1. To predict Y given X and C: -# hadoop jar SystemML.jar -f Kmeans-predict.dml -nvargs X=INPUT_DIR/X -# C=INPUT_DIR/C prY=OUTPUT_DIR/PredY O=OUTPUT_DIR/stats -# 2. To compare "actual" labels spY with "predicted" labels given X and C: -# hadoop jar SystemML.jar -f Kmeans-predict.dml -nvargs X=INPUT_DIR/X -# C=INPUT_DIR/C spY=INPUT_DIR/Y O=OUTPUT_DIR/stats -# 3. To compare "actual" labels spY with given "predicted" labels prY: -# hadoop jar SystemML.jar -f Kmeans-predict.dml -nvargs spY=INPUT_DIR/Y -# prY=INPUT_DIR/PredY O=OUTPUT_DIR/stats - - -fmt_prY = ifdef ($fmt, "text"); -filePrY = ifdef ($prY, " "); -fileSpY = ifdef ($spY, " "); -fileX = ifdef ($X, " "); -fileC = ifdef ($C, " "); -fileO = ifdef ($O, " "); - -is_str_empty = TRUE; -str = " "; - -print ("BEGIN K-MEANS SCORING SCRIPT"); - -if (fileX != " ") { - print ("Reading X..."); - X = read (fileX); - total_mean = colSums (X) / nrow (X); - total_ss = sum( (X - total_mean)^2 ); -} - -if ((fileC != " ") & (fileX == " ")) { - print ("ERROR: Cannot provide C without providing X."); -} else { - - -if (fileC != " ") { - print ("Reading C..."); - C = read (fileC); - num_clusters = nrow (C); - ones_C = matrix (1, rows = num_clusters, cols = 1); - print ("Computing the predicted Y..."); - D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2)); - prY = rowIndexMin (D); - if (filePrY != " ") { - print ("Writing the predicted Y..."); - write (prY, filePrY, format=fmt_prY); - } -} else { - print ("Reading the predicted Y..."); - prY = read (filePrY); - num_clusters = max (prY); - ones_C = matrix (1, rows = num_clusters, cols = 1); -} - -if (fileX != " ") { - print ("Computing the WCSS..."); - # Compute projection matrix from clusters to records - P = matrix (0, rows = nrow (X), cols = num_clusters); - P [, 1 : max (prY)] = table (seq (1, nrow (X), 1), prY); - # Compute the means, as opposed to the centroids - cluster_sizes = t(colSums (P)); - record_of_ones = matrix (1, rows = 1, cols = ncol (X)); - M = (t(P) %*% X) / ((cluster_sizes + ppred (cluster_sizes, 0, "==")) %*% record_of_ones); - # Compute the WCSS for the means - wcss_means = sum ((X - P %*% M) ^ 2); - wcss_means_pc = 100.0 * wcss_means / total_ss; - bcss_means = sum (cluster_sizes * rowSums ((M - ones_C %*% total_mean) ^ 2)); - bcss_means_pc = 100.0 * bcss_means / total_ss; - # Output results - print ("Total Sum of Squares (TSS) = " + total_ss); - print ("WCSS for cluster means: " + (round (10000.0 * wcss_means_pc) / 10000.0) + "% of TSS = " + wcss_means); - print ("BCSS for cluster means: " + (round (10000.0 * bcss_means_pc) / 10000.0) + "% of TSS = " + bcss_means); - str = "TSS,," + total_ss; - str = append (str, "WCSS_M,," + wcss_means); - str = append (str, "WCSS_M_PC,," + wcss_means_pc); - str = append (str, "BCSS_M,," + bcss_means); - str = append (str, "BCSS_M_PC,," + bcss_means_pc); - is_str_empty = FALSE; -} - -if (fileC != " ") { - # Compute the WCSS for the centroids - wcss_centroids = sum ((X - P %*% C) ^ 2); - wcss_centroids_pc = 100.0 * wcss_centroids / total_ss; - bcss_centroids = sum (cluster_sizes * rowSums ((C - ones_C %*% total_mean) ^ 2)); - bcss_centroids_pc = 100.0 * bcss_centroids / total_ss; - # Output results - print ("WCSS for centroids: " + (round (10000.0 * wcss_centroids_pc) / 10000.0) + "% of TSS = " + wcss_centroids); - print ("BCSS for centroids: " + (round (10000.0 * bcss_centroids_pc) / 10000.0) + "% of TSS = " + bcss_centroids); - str = append (str, "WCSS_C,," + wcss_centroids); - str = append (str, "WCSS_C_PC,," + wcss_centroids_pc); - str = append (str, "BCSS_C,," + bcss_centroids); - str = append (str, "BCSS_C_PC,," + bcss_centroids_pc); -} - - - -if (fileSpY != " ") { - -print ("Reading the specified Y..."); -spY = read (fileSpY); -num_records = nrow (spY); - -if (num_records != nrow (prY) | ncol (spY) != 1 | ncol (prY) != 1) { - print ("ERROR: spY and/or prY size mismatch"); - print ("nrow (spY) = " + nrow (spY) + "; ncol (spY) = " + ncol (spY) - + "; nrow (prY) = " + nrow (prY) + "; ncol (prY) = " + ncol (prY)); -} else { - - print ("Computing the pairs counts..."); - - orig_min_spY = min (spY); - orig_min_prY = min (prY); - spY = spY + (1 - orig_min_spY); - prY = prY + (1 - orig_min_prY); - - spYprY_row_counts = table (spY, prY); - spY_row_counts = rowSums (spYprY_row_counts); - prY_row_counts = t(colSums (spYprY_row_counts)); - - # Count all pairs of rows having the same (spY, prY)-values - spYprY_pair_counts = spYprY_row_counts * (spYprY_row_counts - 1) / 2; - - # Count all pairs of rows having the same spY-values - spY_pair_counts = spY_row_counts * (spY_row_counts - 1) / 2; - # Count all pairs of rows having the same prY-values - prY_pair_counts = prY_row_counts * (prY_row_counts - 1) / 2; - - num_pairs = num_records * (num_records - 1.0) / 2.0; - - num_TP_pairs = sum (spYprY_pair_counts); - num_FP_pairs = sum (prY_pair_counts) - num_TP_pairs; - num_FN_pairs = sum (spY_pair_counts) - num_TP_pairs; - num_TN_pairs = num_pairs - num_TP_pairs - num_FP_pairs - num_FN_pairs; - - pct_TP_pairs = num_TP_pairs / num_pairs * 100.0; - pct_TN_pairs = num_TN_pairs / num_pairs * 100.0; - pct_FP_pairs = num_FP_pairs / num_pairs * 100.0; - pct_FN_pairs = num_FN_pairs / num_pairs * 100.0; - - if (is_str_empty) { - str = "TRUE_SAME_CT,," + num_TP_pairs; - is_str_empty = FALSE; - } else { - str = append (str, "TRUE_SAME_CT,," + num_TP_pairs); - } - str = append (str, "TRUE_SAME_PC,," + pct_TP_pairs); - str = append (str, "TRUE_DIFF_CT,," + num_TN_pairs); - str = append (str, "TRUE_DIFF_PC,," + pct_TN_pairs); - str = append (str, "FALSE_SAME_CT,," + num_FP_pairs); - str = append (str, "FALSE_SAME_PC,," + pct_FP_pairs); - str = append (str, "FALSE_DIFF_CT,," + num_FN_pairs); - str = append (str, "FALSE_DIFF_PC,," + pct_FN_pairs); - - pct_TP_pairs = round (pct_TP_pairs * 10000.0) / 10000.0; - pct_TN_pairs = round (pct_TN_pairs * 10000.0) / 10000.0; - pct_FP_pairs = round (pct_FP_pairs * 10000.0) / 10000.0; - pct_FN_pairs = round (pct_FN_pairs * 10000.0) / 10000.0; - - space_TP = ""; if (pct_TP_pairs < 100) {space_TP = " ";} if (pct_TP_pairs < 10) {space_TP = " ";} - space_TN = ""; if (pct_TN_pairs < 100) {space_TN = " ";} if (pct_TN_pairs < 10) {space_TN = " ";} - space_FP = ""; if (pct_FP_pairs < 100) {space_FP = " ";} if (pct_FP_pairs < 10) {space_FP = " ";} - space_FN = ""; if (pct_FN_pairs < 100) {space_FN = " ";} if (pct_FN_pairs < 10) {space_FN = " ";} - - print ("Same-cluster pairs predicted as Same-cluster ( True Pos): " + space_TP - + pct_TP_pairs + "% of all pairs" + " (" + num_TP_pairs + ")"); - print ("Diff-cluster pairs predicted as Diff-cluster ( True Neg): " + space_TN - + pct_TN_pairs + "% of all pairs" + " (" + num_TN_pairs + ")"); - print ("Diff-cluster pairs predicted as Same-cluster (False Pos): " + space_FP - + pct_FP_pairs + "% of all pairs" + " (" + num_FP_pairs + ")"); - print ("Same-cluster pairs predicted as Diff-cluster (False Neg): " + space_FN - + pct_FN_pairs + "% of all pairs" + " (" + num_FN_pairs + ")"); - - [spY_cids, prY_cids, full_counts, matching_counts, rounded_percentages] = - get_best_assignments (spYprY_row_counts); - - print (" "); - print ("Specified Categories versus Predicted Clusters:"); - - spY_cids = spY_cids + orig_min_spY - 1; - prY_cids = prY_cids + orig_min_prY - 1; - - for (i in 1 : nrow (spY_cids)) - { - cid = as.integer (castAsScalar (spY_cids [i, 1])); - pct = castAsScalar (rounded_percentages [i, 1]); - space_pct = ""; if (pct < 100) {space_pct = " ";} if (pct < 10) {space_pct = " ";} - print ("Category " + cid + - ": best pred. cluster is " + as.integer (castAsScalar (prY_cids [i, 1])) + - "; full count = " + as.integer (castAsScalar (full_counts [i, 1])) + - ", matching count = " + space_pct + pct + "% (" + - as.integer (castAsScalar (matching_counts [i, 1])) + ")"); - - str = append (str, "SPEC_TO_PRED," + cid + "," + castAsScalar (prY_cids [i, 1])); - str = append (str, "SPEC_FULL_CT," + cid + "," + castAsScalar (full_counts [i, 1])); - str = append (str, "SPEC_MATCH_CT," + cid + "," + castAsScalar (matching_counts [i, 1])); - str = append (str, "SPEC_MATCH_PC," + cid + "," + castAsScalar (rounded_percentages [i, 1])); - } - - [prY_cids, spY_cids, full_counts, matching_counts, rounded_percentages] = - get_best_assignments (t(spYprY_row_counts)); - - print (" "); - print ("Predicted Clusters versus Specified Categories:"); - - prY_cids = prY_cids + orig_min_prY - 1; - spY_cids = spY_cids + orig_min_spY - 1; - - for (i in 1 : nrow (prY_cids)) - { - cid = as.integer (castAsScalar (prY_cids [i, 1])); - pct = castAsScalar (rounded_percentages [i, 1]); - space_pct = ""; if (pct < 100) {space_pct = " ";} if (pct < 10) {space_pct = " ";} - print ("Cluster " + cid + - ": best spec. categ. is " + as.integer (castAsScalar (spY_cids [i, 1])) + - "; full count = " + as.integer (castAsScalar (full_counts [i, 1])) + - ", matching count = " + space_pct + pct + "% (" + - as.integer (castAsScalar (matching_counts [i, 1])) + ")"); - - str = append (str, "PRED_TO_SPEC," + cid + "," + castAsScalar (spY_cids [i, 1])); - str = append (str, "PRED_FULL_CT," + cid + "," + castAsScalar (full_counts [i, 1])); - str = append (str, "PRED_MATCH_CT," + cid + "," + castAsScalar (matching_counts [i, 1])); - str = append (str, "PRED_MATCH_PC," + cid + "," + castAsScalar (rounded_percentages [i, 1])); - } - - print (" "); -}}} - -if ((fileO != " ") & (! is_str_empty)) { - write (str, fileO); -} - -print ("DONE: K-MEANS SCORING SCRIPT"); - - - -get_best_assignments = function (Matrix[double] counts) -return (Matrix[double] row_ids, Matrix[double] col_ids, Matrix[double] margins, - Matrix[double] max_counts, Matrix[double] rounded_percentages) -{ - margins = rowSums (counts); - select_positive = diag (ppred (margins, 0, ">")); - select_positive = removeEmpty (target = select_positive, margin = "rows"); - row_ids = select_positive %*% seq (1, nrow (margins), 1); - pos_counts = select_positive %*% counts; - pos_margins = select_positive %*% margins; - max_counts = rowMaxs (pos_counts); - one_per_column = matrix (1, rows = 1, cols = ncol (pos_counts)); - max_counts_ppred = max_counts %*% one_per_column; - is_max_count = ppred (pos_counts, max_counts_ppred, "=="); - aggr_is_max_count = t(cumsum (t(is_max_count))); - col_ids = rowSums (ppred (aggr_is_max_count, 0, "==")) + 1; - rounded_percentages = round (1000000.0 * max_counts / pos_margins) / 10000.0; -} - +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# +# Compares two categorical data vectors (presumed to be clusterings) and +# counts matching/nonmatching same-cluster/different-cluster pairs of rows +# +# INPUT PARAMETERS: +# --------------------------------------------------------------------------- +# NAME TYPE DEFAULT MEANING +# --------------------------------------------------------------------------- +# spY String " " Location to read a column-vector with the "specified" +# assignment of records (rows) to categories (clusters) +# prY String " " Location to read (or write, if X and C are present) a +# column-vector with the "predicted" assignment of rows +# to clusters. NOTE: The same category may be labeled +# differently in each of the two vectors, spY and prY. +# fmt String "text" Matrix output format for prY, usually "text" or "csv" +# X String " " Location to read matrix X with the input data records +# C String " " Location to read matrix C with the cluster centroids +# NOTE: If X and C are present, prY is an output file. +# O String " " Location to write the printed output statistics +# --------------------------------------------------------------------------- +# +# The "O" file provides the output statistics in CSV format, one per line, in +# the following format: NAME, [CID], VALUE. Note: +# - The 1st group statistics are given if X input is available; +# - The 2nd group statistics are given if X and C inputs are available; +# - The 3rd and 4th group statistics are given if spY input is available; +# - Only the 4th group statistics contain a nonempty CID value; +# - When present, CID contains either the specified category label or the +# predicted cluster label. +# +# NAME CID MEANING +# --------------------------------------------------------------------------- +# TSS Total Sum of Squares (from the total mean) +# WCSS_M Within-Cluster Sum of Squares (means as centers) +# WCSS_M_PC Within-Cluster Sum of Squares (means), in % of TSS +# BCSS_M Between-Cluster Sum of Squares (means as centers) +# BCSS_M_PC Between-Cluster Sum of Squares (means), in % of TSS +# +# WCSS_C Within-Cluster Sum of Squares (centroids as centers) +# WCSS_C_PC Within-Cluster Sum of Squares (centroids), % of TSS +# BCSS_C Between-Cluster Sum of Squares (centroids as centers) +# BCSS_C_PC Between-Cluster Sum of Squares (centroids), % of TSS +# +# TRUE_SAME_CT Same-category pairs predicted as Same-cluster, count +# TRUE_SAME_PC Same-category pairs predicted as Same-cluster, % +# TRUE_DIFF_CT Diff-category pairs predicted as Diff-cluster, count +# TRUE_DIFF_PC Diff-category pairs predicted as Diff-cluster, % +# FALSE_SAME_CT Diff-category pairs predicted as Same-cluster, count +# FALSE_SAME_PC Diff-category pairs predicted as Same-cluster, % +# FALSE_DIFF_CT Same-category pairs predicted as Diff-cluster, count +# FALSE_DIFF_PC Same-category pairs predicted as Diff-cluster, % +# +# SPEC_TO_PRED + For specified category, the best predicted cluster id +# SPEC_FULL_CT + For specified category, its full count +# SPEC_MATCH_CT + For specified category, best-cluster matching count +# SPEC_MATCH_PC + For specified category, % of matching to full count +# PRED_TO_SPEC + For predicted cluster, the best specified category id +# PRED_FULL_CT + For predicted cluster, its full count +# PRED_MATCH_CT + For predicted cluster, best-category matching count +# PRED_MATCH_PC + For predicted cluster, % of matching to full count +# --------------------------------------------------------------------------- +# +# Examples: +# 1. To predict Y given X and C: +# hadoop jar SystemML.jar -f Kmeans-predict.dml -nvargs X=INPUT_DIR/X +# C=INPUT_DIR/C prY=OUTPUT_DIR/PredY O=OUTPUT_DIR/stats +# 2. To compare "actual" labels spY with "predicted" labels given X and C: +# hadoop jar SystemML.jar -f Kmeans-predict.dml -nvargs X=INPUT_DIR/X +# C=INPUT_DIR/C spY=INPUT_DIR/Y O=OUTPUT_DIR/stats +# 3. To compare "actual" labels spY with given "predicted" labels prY: +# hadoop jar SystemML.jar -f Kmeans-predict.dml -nvargs spY=INPUT_DIR/Y +# prY=INPUT_DIR/PredY O=OUTPUT_DIR/stats + + +fmt_prY = ifdef ($fmt, "text"); +filePrY = ifdef ($prY, " "); +fileSpY = ifdef ($spY, " "); +fileX = ifdef ($X, " "); +fileC = ifdef ($C, " "); +fileO = ifdef ($O, " "); + +is_str_empty = TRUE; +str = " "; + +print ("BEGIN K-MEANS SCORING SCRIPT"); + +if (fileX != " ") { + print ("Reading X..."); + X = read (fileX); + total_mean = colSums (X) / nrow (X); + total_ss = sum( (X - total_mean)^2 ); +} + +if ((fileC != " ") & (fileX == " ")) { + print ("ERROR: Cannot provide C without providing X."); +} else { + + +if (fileC != " ") { + print ("Reading C..."); + C = read (fileC); + num_clusters = nrow (C); + ones_C = matrix (1, rows = num_clusters, cols = 1); + print ("Computing the predicted Y..."); + D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2)); + prY = rowIndexMin (D); + if (filePrY != " ") { + print ("Writing the predicted Y..."); + write (prY, filePrY, format=fmt_prY); + } +} else { + print ("Reading the predicted Y..."); + prY = read (filePrY); + num_clusters = max (prY); + ones_C = matrix (1, rows = num_clusters, cols = 1); +} + +if (fileX != " ") { + print ("Computing the WCSS..."); + # Compute projection matrix from clusters to records + P = matrix (0, rows = nrow (X), cols = num_clusters); + P [, 1 : max (prY)] = table (seq (1, nrow (X), 1), prY); + # Compute the means, as opposed to the centroids + cluster_sizes = t(colSums (P)); + record_of_ones = matrix (1, rows = 1, cols = ncol (X)); + M = (t(P) %*% X) / ((cluster_sizes + ppred (cluster_sizes, 0, "==")) %*% record_of_ones); + # Compute the WCSS for the means + wcss_means = sum ((X - P %*% M) ^ 2); + wcss_means_pc = 100.0 * wcss_means / total_ss; + bcss_means = sum (cluster_sizes * rowSums ((M - ones_C %*% total_mean) ^ 2)); + bcss_means_pc = 100.0 * bcss_means / total_ss; + # Output results + print ("Total Sum of Squares (TSS) = " + total_ss); + print ("WCSS for cluster means: " + (round (10000.0 * wcss_means_pc) / 10000.0) + "% of TSS = " + wcss_means); + print ("BCSS for cluster means: " + (round (10000.0 * bcss_means_pc) / 10000.0) + "% of TSS = " + bcss_means); + str = "TSS,," + total_ss; + str = append (str, "WCSS_M,," + wcss_means); + str = append (str, "WCSS_M_PC,," + wcss_means_pc); + str = append (str, "BCSS_M,," + bcss_means); + str = append (str, "BCSS_M_PC,," + bcss_means_pc); + is_str_empty = FALSE; +} + +if (fileC != " ") { + # Compute the WCSS for the centroids + wcss_centroids = sum ((X - P %*% C) ^ 2); + wcss_centroids_pc = 100.0 * wcss_centroids / total_ss; + bcss_centroids = sum (cluster_sizes * rowSums ((C - ones_C %*% total_mean) ^ 2)); + bcss_centroids_pc = 100.0 * bcss_centroids / total_ss; + # Output results + print ("WCSS for centroids: " + (round (10000.0 * wcss_centroids_pc) / 10000.0) + "% of TSS = " + wcss_centroids); + print ("BCSS for centroids: " + (round (10000.0 * bcss_centroids_pc) / 10000.0) + "% of TSS = " + bcss_centroids); + str = append (str, "WCSS_C,," + wcss_centroids); + str = append (str, "WCSS_C_PC,," + wcss_centroids_pc); + str = append (str, "BCSS_C,," + bcss_centroids); + str = append (str, "BCSS_C_PC,," + bcss_centroids_pc); +} + + + +if (fileSpY != " ") { + +print ("Reading the specified Y..."); +spY = read (fileSpY); +num_records = nrow (spY); + +if (num_records != nrow (prY) | ncol (spY) != 1 | ncol (prY) != 1) { + print ("ERROR: spY and/or prY size mismatch"); + print ("nrow (spY) = " + nrow (spY) + "; ncol (spY) = " + ncol (spY) + + "; nrow (prY) = " + nrow (prY) + "; ncol (prY) = " + ncol (prY)); +} else { + + print ("Computing the pairs counts..."); + + orig_min_spY = min (spY); + orig_min_prY = min (prY); + spY = spY + (1 - orig_min_spY); + prY = prY + (1 - orig_min_prY); + + spYprY_row_counts = table (spY, prY); + spY_row_counts = rowSums (spYprY_row_counts); + prY_row_counts = t(colSums (spYprY_row_counts)); + + # Count all pairs of rows having the same (spY, prY)-values + spYprY_pair_counts = spYprY_row_counts * (spYprY_row_counts - 1) / 2; + + # Count all pairs of rows having the same spY-values + spY_pair_counts = spY_row_counts * (spY_row_counts - 1) / 2; + # Count all pairs of rows having the same prY-values + prY_pair_counts = prY_row_counts * (prY_row_counts - 1) / 2; + + num_pairs = num_records * (num_records - 1.0) / 2.0; + + num_TP_pairs = sum (spYprY_pair_counts); + num_FP_pairs = sum (prY_pair_counts) - num_TP_pairs; + num_FN_pairs = sum (spY_pair_counts) - num_TP_pairs; + num_TN_pairs = num_pairs - num_TP_pairs - num_FP_pairs - num_FN_pairs; + + pct_TP_pairs = num_TP_pairs / num_pairs * 100.0; + pct_TN_pairs = num_TN_pairs / num_pairs * 100.0; + pct_FP_pairs = num_FP_pairs / num_pairs * 100.0; + pct_FN_pairs = num_FN_pairs / num_pairs * 100.0; + + if (is_str_empty) { + str = "TRUE_SAME_CT,," + num_TP_pairs; + is_str_empty = FALSE; + } else { + str = append (str, "TRUE_SAME_CT,," + num_TP_pairs); + } + str = append (str, "TRUE_SAME_PC,," + pct_TP_pairs); + str = append (str, "TRUE_DIFF_CT,," + num_TN_pairs); + str = append (str, "TRUE_DIFF_PC,," + pct_TN_pairs); + str = append (str, "FALSE_SAME_CT,," + num_FP_pairs); + str = append (str, "FALSE_SAME_PC,," + pct_FP_pairs); + str = append (str, "FALSE_DIFF_CT,," + num_FN_pairs); + str = append (str, "FALSE_DIFF_PC,," + pct_FN_pairs); + + pct_TP_pairs = round (pct_TP_pairs * 10000.0) / 10000.0; + pct_TN_pairs = round (pct_TN_pairs * 10000.0) / 10000.0; + pct_FP_pairs = round (pct_FP_pairs * 10000.0) / 10000.0; + pct_FN_pairs = round (pct_FN_pairs * 10000.0) / 10000.0; + + space_TP = ""; if (pct_TP_pairs < 100) {space_TP = " ";} if (pct_TP_pairs < 10) {space_TP = " ";} + space_TN = ""; if (pct_TN_pairs < 100) {space_TN = " ";} if (pct_TN_pairs < 10) {space_TN = " ";} + space_FP = ""; if (pct_FP_pairs < 100) {space_FP = " ";} if (pct_FP_pairs < 10) {space_FP = " ";} + space_FN = ""; if (pct_FN_pairs < 100) {space_FN = " ";} if (pct_FN_pairs < 10) {space_FN = " ";} + + print ("Same-cluster pairs predicted as Same-cluster ( True Pos): " + space_TP + + pct_TP_pairs + "% of all pairs" + " (" + num_TP_pairs + ")"); + print ("Diff-cluster pairs predicted as Diff-cluster ( True Neg): " + space_TN + + pct_TN_pairs + "% of all pairs" + " (" + num_TN_pairs + ")"); + print ("Diff-cluster pairs predicted as Same-cluster (False Pos): " + space_FP + + pct_FP_pairs + "% of all pairs" + " (" + num_FP_pairs + ")"); + print ("Same-cluster pairs predicted as Diff-cluster (False Neg): " + space_FN + + pct_FN_pairs + "% of all pairs" + " (" + num_FN_pairs + ")"); + + [spY_cids, prY_cids, full_counts, matching_counts, rounded_percentages] = + get_best_assignments (spYprY_row_counts); + + print (" "); + print ("Specified Categories versus Predicted Clusters:"); + + spY_cids = spY_cids + orig_min_spY - 1; + prY_cids = prY_cids + orig_min_prY - 1; + + for (i in 1 : nrow (spY_cids)) + { + cid = as.integer (castAsScalar (spY_cids [i, 1])); + pct = castAsScalar (rounded_percentages [i, 1]); + space_pct = ""; if (pct < 100) {space_pct = " ";} if (pct < 10) {space_pct = " ";} + print ("Category " + cid + + ": best pred. cluster is " + as.integer (castAsScalar (prY_cids [i, 1])) + + "; full count = " + as.integer (castAsScalar (full_counts [i, 1])) + + ", matching count = " + space_pct + pct + "% (" + + as.integer (castAsScalar (matching_counts [i, 1])) + ")"); + + str = append (str, "SPEC_TO_PRED," + cid + "," + castAsScalar (prY_cids [i, 1])); + str = append (str, "SPEC_FULL_CT," + cid + "," + castAsScalar (full_counts [i, 1])); + str = append (str, "SPEC_MATCH_CT," + cid + "," + castAsScalar (matching_counts [i, 1])); + str = append (str, "SPEC_MATCH_PC," + cid + "," + castAsScalar (rounded_percentages [i, 1])); + } + + [prY_cids, spY_cids, full_counts, matching_counts, rounded_percentages] = + get_best_assignments (t(spYprY_row_counts)); + + print (" "); + print ("Predicted Clusters versus Specified Categories:"); + + prY_cids = prY_cids + orig_min_prY - 1; + spY_cids = spY_cids + orig_min_spY - 1; + + for (i in 1 : nrow (prY_cids)) + { + cid = as.integer (castAsScalar (prY_cids [i, 1])); + pct = castAsScalar (rounded_percentages [i, 1]); + space_pct = ""; if (pct < 100) {space_pct = " ";} if (pct < 10) {space_pct = " ";} + print ("Cluster " + cid + + ": best spec. categ. is " + as.integer (castAsScalar (spY_cids [i, 1])) + + "; full count = " + as.integer (castAsScalar (full_counts [i, 1])) + + ", matching count = " + space_pct + pct + "% (" + + as.integer (castAsScalar (matching_counts [i, 1])) + ")"); + + str = append (str, "PRED_TO_SPEC," + cid + "," + castAsScalar (spY_cids [i, 1])); + str = append (str, "PRED_FULL_CT," + cid + "," + castAsScalar (full_counts [i, 1])); + str = append (str, "PRED_MATCH_CT," + cid + "," + castAsScalar (matching_counts [i, 1])); + str = append (str, "PRED_MATCH_PC," + cid + "," + castAsScalar (rounded_percentages [i, 1])); + } + + print (" "); +}}} + +if ((fileO != " ") & (! is_str_empty)) { + write (str, fileO); +} + +print ("DONE: K-MEANS SCORING SCRIPT"); + + + +get_best_assignments = function (Matrix[double] counts) +return (Matrix[double] row_ids, Matrix[double] col_ids, Matrix[double] margins, + Matrix[double] max_counts, Matrix[double] rounded_percentages) +{ + margins = rowSums (counts); + select_positive = diag (ppred (margins, 0, ">")); + select_positive = removeEmpty (target = select_positive, margin = "rows"); + row_ids = select_positive %*% seq (1, nrow (margins), 1); + pos_counts = select_positive %*% counts; + pos_margins = select_positive %*% margins; + max_counts = rowMaxs (pos_counts); + one_per_column = matrix (1, rows = 1, cols = ncol (pos_counts)); + max_counts_ppred = max_counts %*% one_per_column; + is_max_count = ppred (pos_counts, max_counts_ppred, "=="); + aggr_is_max_count = t(cumsum (t(is_max_count))); + col_ids = rowSums (ppred (aggr_is_max_count, 0, "==")) + 1; + rounded_percentages = round (1000000.0 * max_counts / pos_margins) / 10000.0; +} +
