Repository: incubator-madlib
Updated Branches:
  refs/heads/master 2f1c4b288 -> c82c0f382


Feature: Add grouping to weakly connected components

JIRA: MADLIB-1083

Add grouping support to weakly connected components. Make necessary
changes in the queries involved, docs, and install check.

Closes #147


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/c82c0f38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/c82c0f38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/c82c0f38

Branch: refs/heads/master
Commit: c82c0f3823425239bc755e155f6d23ef6eb266af
Parents: 2f1c4b2
Author: Nandish Jayaram <njaya...@apache.org>
Authored: Thu Jun 29 10:17:01 2017 -0700
Committer: Nandish Jayaram <njaya...@apache.org>
Committed: Thu Jul 6 13:51:33 2017 -0700

----------------------------------------------------------------------
 doc/design/figures/wcc_example.pdf              | 163 +++++++++++++++++++
 doc/design/modules/graph.tex                    | 111 +++++++++++++
 doc/literature.bib                              |  10 ++
 doc/mainpage.dox.in                             |   3 +
 .../postgres/modules/graph/test/wcc.sql_in      |  46 +++++-
 src/ports/postgres/modules/graph/wcc.py_in      | 146 ++++++++++++++---
 src/ports/postgres/modules/graph/wcc.sql_in     |  52 +++---
 7 files changed, 479 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/design/figures/wcc_example.pdf
----------------------------------------------------------------------
diff --git a/doc/design/figures/wcc_example.pdf 
b/doc/design/figures/wcc_example.pdf
new file mode 100644
index 0000000..a57cd5e
--- /dev/null
+++ b/doc/design/figures/wcc_example.pdf
@@ -0,0 +1,163 @@
+%PDF-1.4
+%����
+1 0 obj
+   << 
+      /Title ()
+      /Author ()
+      /Subject ()
+      /Keywords ()
+      /Creator (yExport 1.5)
+      /Producer (org.freehep.graphicsio.pdf.YPDFGraphics2D 1.5)
+      /CreationDate (D:20170703114556-07'00')
+      /ModDate (D:20170703114556-07'00')
+      /Trapped /False
+   >>
+endobj
+2 0 obj
+   << 
+      /Type /Catalog
+      /Pages 3 0 R
+      /ViewerPreferences 4 0 R
+      /OpenAction [5 0 R /Fit]
+   >>
+endobj
+4 0 obj
+   << 
+      /FitWindow true
+      /CenterWindow false
+   >>
+endobj
+5 0 obj
+   << 
+      /Parent 3 0 R
+      /Type /Page
+      /Contents 6 0 R
+   >>
+endobj
+6 0 obj
+   << 
+      /Length 7 0 R
+      /Filter [/ASCII85Decode /FlateDecode]
+   >>
+stream
+Gb!SobDpGMD4T9[g`P`#4\._j)&Y-d5s'^XK*(B\e.%T9rrI0m;VHJi.X0dC:Q2;q&*g<\85U)?c%dbj
+ao;1'DEiTm2d#p`IscVArpnr"pPId7C?iirh\H]b\!K17\!LH.rr(qCNR,B;le?pfnfR\^?iTi814K/I
+?U+PgTE"DD5Pk>6^GLdsr[4lAW11i@5O.B.GHREJC&!0(qQEWmWTF955F!t?%PA[E\%-d"oY]RU7Y=k5
+qYnq6]\p8Q[Im@Zc[Wl[r\#pSq558&djEs2n[nFC1ep*OKDn+54nRXQXA3p:#F8>M1;6agr/dF.8Z;8H
+Y:m;DUR+,IDj9q;(NUUX#*PX\j2s<Bc1P)gBY.-M45\q@hW`c::"c7qrq')WNiKMi;ia6]NpE4UR'8G^
+7r'4)FS&<JoCGi@MUR\,nMj.LS]mk)$f:e<*Eq=YON24M5[_`Q!rn:%^MWmm..HQWcbDV;V_[O3(P-'d
+db@2,d`+utU@Y)ed2Sq5S'29;Ne0$0)go+I@b!=sDNpn[ki-l38X^WqT6D_<CqIUYJKa\+8$a_toOpMg
+ctoX$EuJd9Ys"J\+.[Gp+K^&*E,Xg!p'jPI=q?`/Ps[$8"6ZEiNn-K!]pc+6:-*"L>SAFrX10K<&Y+Hp
+;!104gCBe;W+$]2%2-dVD6<C*ZEW0^=X_$+3f#5)j/VM^YYNq:_I`#!;;Vg2Vcct[9&Ze,H87U;949UK
+m+s$U',q[&"*4R6#n2h@+gT)&Vn7CN%DL>\^s><$hY]2;[58fJ.aO3CkM4WKGN[bBmX)'bl@Et]2W6E&
+^"A/$:[S1rrgeP5s/YRCF2W),(KWptmaurW`Vm"cP;u]gm%K(OA\.H?\Ud\XcRI>&Dd%NueJPDIb'N7J
+<<$T)EDoLnAqM]O0)263D-L9"*8@t;\fAr>[bm8<,EV</YW8XF>2UYbV>U\6_RUE5*B9N921u7S(oX#S
+_4CF-?k]Amo.AGrr.Rip]$RM$.gs+_Qu:<Uc%jNYo[1[\k(U&Q4@hA3=&5V^/g1J)l,r+$No4&7FSHM>
+Y?XTH*eb;#ABKUd@0p)&9a*0%B#n56ak<F@6h%1c%V7'IG>4qekGTB,Eh,$_,]L5[)n<^k!p.:iQaBQ3
+:g\:S%RWH0l<9WOkm6J4B,N[]4]bLPCj-ahV+jJjH"e#%pU\Z`MojPDUX3H(US1%/bhqi3qt`=$m[*Z(
+ch"[sjedriI7IK6U'iROV@/C\&Yo.kREJ=eKmcIL1>V^X?*%%9<,Rpu*d$p7T*mZVgGNKdN?^Yd>b)BZ
+/jLcWq6j;CR9Cpif6]X9LT@!AD"$o9hrhblZJl`R*YgaA8F/(ok.%Y*=8e)en`sE;o58&2Ef$s50,u^;
+=qH'NK0!,fg<lQs\9:>og2E5Sb1Gb[p6[H\,MigB3uX&X""stOnh0BpDrXZY0c7T"fS*g)pU"cN!GP.u
+pgWLW5Tqk@cp>8S"Pm1G;:!Y3(rnVhFTWPR=_d)J/R(fba[Tpm<!IBFNB/6bT1c9,beU2=ZMOt(n5X])
+lLMCL.!$SdTKpUrc7q7HL.BQEq$DP+D>@&.Ms`ZQ`"`m()tuS?_N6g(5%a&OHMRt*rOap2QcULO/`FmE
+T4pHN73e(="#Gd4:Qfui6Bhh"?:X3oAb<,UKF<XE0$$-N6EcWgb]D:O5!oM:Fg[.R).@pkHi3LA"3LDi
+^PdqtqKEhQieIVQs1UtoNA=l:DZ;?C0E,8L``Eu6=>`S07O`Pqc]'X>B[_6RKV;)*Rb`<]ft8.I-tYbW
+Ba_WF>;"qUSa]NBn6q&`P`^2D/Wrq;a!NODa=IeRA_h$EVQqq/$W8HI30M@,Gk!/*atT5!)Re`M;+;G&
+n<QD]A"culD<,b;*9CZEU>uKV5oGd#7;9iUTqUN[<]s<4X7a$A!Y!"&;K4i*Z;,,BMM;P$W?lj9"9q2'
+@RK"d;<YK\UQZb?:p&5>/OY%'$YLa2*gn/PCWr!Y$&MO&9.t-,Q`5sc1b>BZ&FT8^ap?>=\PX1.()+2&
+rTAHJ06du0gE"Cl*T<K$<l*2/ZSZ4,)^rlrR+k^U>bfH+&X5mU95Ir3K9Xi]*0i;R-ueGdboUq_#BFl`
+H5X,'k?DU'2(aaNDAAA5KiL3m)3f=toZC9_kW.oi2+k%2H@QI6A[QD]dndDufpD89p%pba-=>Y_5-[Rj
++$tZSl`Q/"`<1:JNGW/D/XsR28$=t...@ohe.ld=.kW=2$ROE7p3p:mN'XZM;G>AN-8Wb.G;^mPj-PD[%dN
+qMbmGkl6XbccRK8-2NTUhm+Wu2_J@0h`(`N[alGs`kWt:7]6+nf9MsP@"jLu,n-<W6B?X3L+(0u++BV.
+,B:(/i(WJ+/e/gWQ'u4lN.=pQ"]i2@L<c'N60(Gne]3ZM#H0sJ;8#m650@0;MZ_2L,V,Tk[8eEZ@LF3N
+,Kqb'0^M[17"D#6S',!sok!1M>o`95Lad?6<N0c8ReoE(;n7kPiZs]neqXHB%r4S5!\)W;7&L_m#Qfj0
+nr;05!d9`Wcnb1c)>%.!%]M=K!7g),;7r/pGZK+0=lGn:=Q`p#O5OQ:>%FKROqqeRfgNoGY;R$'Ic<!*
+QhO+uo?c#O!h'rV;&:/^n)u,W;EH[oC0q=,p"cHW9"IL8NNugbg>'L;"_%eZ.HSQN(>i4O7eK6MGJ6lG
+cu:u?N;#8sYjG%i,H^O><YbcK$c;q[Z$B%-rm)(TB$0B;cRtP4>C#^fG.!EK/*dlgp$dG=dj.1=*#1tG
+7lC4:iM=E]J(7cgL>_#aVBH=UDI^:dN@L]L6h#TU1KRr&c^%\Ms'^'sVSipl?6FJP(oAqH1?H\-qtHnV
+6ti$iLAqpJc.-%`rb6otn:(i=n:(fVqELZgJ`-P:A_1j'3ZQO.m9I';C&!#fAn*k!T?5E#?9YGKceSpk
+*ZV8RH6>&:ga68Qk&/@3AES"!X"*HI0fj7CcA5T8l<E40lH0S+as\X`0aCRZ$XknKOa`=:h/#pu'Re&(
+3-TW&01Z/F9\qb`L$fmppbe...@cgno.zbf,4,d1lLJij[]sPXcG5NEVcU,H1\([hPY-18B[/3Z]f3GU
+5@/^lFkWlMJN9jcE,+R7nq<:K"!'2SNL4m!n_PC#,<o%.SC?-aC1tc/R`0@B(tGiO?Cm.FpBERRi1NL+
+W71S=Bu(u`n=?/eib=.Q:!43K-S!slgpjYOLmMT0QR5euSdVh_#b4#\9_CNkL8]+Vd#mWJkB#-qBF#7,
+m.PM_>$q<qou(+9Q('Ba*cCh#d=k-+ksF[JkX+QoZBh!($i>r"V;1hu?*%%9[,!-7q*4b>6`PU7gheU6
+qhUeG:@BpU7BX&\C\(eHA+a)"V#$EK4P*d3jMspb*q-96?MNHdkE^)GaS*R?D>a`Qmp<0mpo<lg#!UQ8
+4[tEh02T#9s$s`sYPF7SPZYT0kT!jeG2qdggZRs1If2A!`(GNA>8FTMk;)<u=<c)=1iHNYU:IV:+IQ<R
+KiYXfYidd"ZJ>5X/82.(UZFsJmX1X1ktgQ,[=1>[7"gX?q7[*,R[C`Xj[2sG3hl2=8J(^db(h<4j<'Wt
+9oTT>YC(.#Q&go-q#KO/2$riQBB94Tl)K,=<R%SQ!L\\4Pn2ePcd@@&,q:"h)jJ#c*ud`3K<smDI#sPn
+g'1SP]B800ji_=)j<'WtT[qE2LSjJsM;gS`?G1Aj/ejS3NEQ&WP-HJY5ZuFU#M,U"]NC>c=*[pDJ:)1<
+7-+OAj.'34K4d2FjT@U5D,icM:r->#&$2i'C*S:Rj[1h2nsrghE)baPHV=)FYDV5+^FF[O8&\91H9EZ+
+ILDeG)#M"eT=?fNotAe+LV?h,*r^Ut"c88sN;!5`(c/JXj>03b7M4[.=-5[-e,0!ar"5da_Pe]aAV>T"
+^<5"c)cOC+*,#IX7#cs;rh=&@AjB5+G`Wfrf_n-?/Ff%bf@PM"bGWb0c=?8ArXR"4EUjp=C#.,d`>UE&
+8+nU,M'bj\r^nKC4,JX*Zrk*7<BXNmRHlfGd'(^#>KA2(jPb<K$$Cf.IqsI]">[Ml@ecKI!h`g4.nsBc
+:]@2A8QbdXX>=muID$B[Ppb-%4u.jE!6LcOErMd2UH2hQZ>[a,mKU3RK$1h$02\icTcm7*8[NjVl8JuX
+,ee0F3QQ-25B`ADf<;56#&Mf6m11*R*liGriDg=$#,lJ1#I1_OJ"=gQVTW%j>Q;dhU'ElTJ#EsO&A6lm
+o1,'hG\]5ScJZRl2J7OA%HlMS#/CNc8RU8mDr8<6r9<&_0^i@sLf"$R2!YU`n"nkhep,Xc])R!^<S5Hj
+rCc66@^/YIrhW)&2S8o\l2HEF`63(P+,GfPgV6]W+^uMS#OMIrjaFKDk5&!tAe`93p,hn9Z,WX<d/CH7
+AGWnBhERDL:uKYpj8Qle9PTCX:SrnaG#<Ma+<R>g*&g@,?u&!QIrPF:rr;sFb<Ob]Ig:7?"KM~>
+endstream
+endobj
+7 0 obj
+   4449
+endobj
+3 0 obj
+   << 
+      /Parent null
+      /Type /Pages
+      /MediaBox [0.0000 0.0000 307.00 279.00]
+      /Resources 8 0 R
+      /Kids [5 0 R]
+      /Count 1
+   >>
+endobj
+9 0 obj
+   [/PDF /Text /ImageC]
+endobj
+10 0 obj
+   << 
+      /S /Transparency
+      /CS /DeviceRGB
+      /I true
+      /K false
+   >>
+endobj
+11 0 obj
+   << 
+      /Alpha1
+      << 
+         /ca 1.0000
+         /CA 1.0000
+         /BM /Normal
+         /AIS false
+      >>
+   >>
+endobj
+8 0 obj
+   << 
+      /ProcSet 9 0 R
+      /ExtGState 11 0 R
+   >>
+endobj
+xref
+0 12
+0000000000 65535 f 
+0000000015 00000 n 
+0000000315 00000 n 
+0000005191 00000 n 
+0000000445 00000 n 
+0000000521 00000 n 
+0000000609 00000 n 
+0000005168 00000 n 
+0000005645 00000 n 
+0000005361 00000 n 
+0000005400 00000 n 
+0000005502 00000 n 
+trailer
+<< 
+   /Size 12
+   /Root 2 0 R
+   /Info 1 0 R
+>>
+startxref
+5718
+%%EOF

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/design/modules/graph.tex
----------------------------------------------------------------------
diff --git a/doc/design/modules/graph.tex b/doc/design/modules/graph.tex
index ec46842..f9183b3 100644
--- a/doc/design/modules/graph.tex
+++ b/doc/design/modules/graph.tex
@@ -29,6 +29,7 @@
                \item[v0.2] Graph Framework, SSSP implementation details.
         \item[v0.3] PageRank
         \item[v0.4] APSP
+        \item[v0.5] Weakly Connected Components
        \end{modulehistory}
 \end{moduleinfo}
 
@@ -505,3 +506,113 @@ a high $threshold$ value would lead to early termination 
of PageRank
 computation, thus resulting in incorrect PageRank values.
 
 
+\section{Weakly Connected Components} \label{sec:graph:wcc}
+\begin{figure}[h]
+    \centering
+    \includegraphics[width=0.5\textwidth]{figures/wcc_example.pdf}
+\caption{An example disconnected directed graph}
+\label{wcc:example}
+\end{figure}
+
+Given a directed graph $G$, a weakly connected component is a subgraph
+$G_{sub}$ of $G$, such that there exists a path from every vertex in $G_{sub}$
+to every other vertex in $G_{sub}$, ignoring the direction of the edges.
+
+The weakly connected component module implemented in MADlib is based on
+GRAIL~\cite{grail}. All vertices are initialized with their own vertex
+ID as the component ID, and are considered to be active. In every iteration,
+each active vertex's component ID is updated with the smallest component ID
+value of all its neighbors. Any vertex whose component ID is not updated in
+the current iteration is deemed as an inactive vertex for the next iteration.
+Execution continues until there are no active vertices left. Since each vertex
+is initialized with its own ID as the component ID, and updated based on
+neighboring nodes' component IDs, the final component ID of a component will
+be equal to the smallest vertex ID in the corresponding subgraph.
+Figure~\ref{wcc:example} shows an example directed graph with two disconnected
+subgraphs. The subgraph containing vertices $1$, $2$, $3$, $4$, $5$ and $6$
+forms a weakly connected component, and is assigned component ID 1, while the
+subgraph containing vertices $12$, $14$, $21$ and $23$ forms the second 
component
+and is assigned component ID 12.
+
+\subsection{Implementation Details} \label{sec:wcc:implementation}
+
+In this section, we discuss the MADlib implementation of weakly connected
+components in depth. We maintain the following tables at every iteration:
+$oldupdate$, $message$ and $newupdate$. In $newupdate$, the component ID
+of each vertex is initialized to infinity, while the component ID of vertices
+in the $message$ table is initialized to their corresponding vertex ID.
+
+\begin{algorithm}[Weakly Connected Components$(V,E)$] \label{alg:wcc:high}
+\begin{algorithmic}[1]
+    \State Create $newupdate$ table with a default component ID of
+            $infinity$ for every vertex
+    \State Create $message$ table with a default component ID of the
+            corresponding $id$ (vertex ID) for every vertex
+    \Repeat
+        \State Update the $oldupdate$ table
+        \State Update $toupdate$ table with active vertices
+        \State Update the $newupdate$ table
+        \State Update $message$ table with potential new component IDs for 
each vertex
+    \Until {There are no active vertices in $toupdate$ table}
+\end{algorithmic}
+\end{algorithm}
+
+The $message$ table contains the component IDs associated with all its
+immediate neighbors. At each iteration, $oldupdate$ is updated with the
+minimum of all the associated component IDs found for a vertex in $message$.
+
+\begin{algorithm}[Update oldupdate table]
+\begin{lstlisting}
+SELECT id, MIN(message.component_id) as component_id
+FROM message
+GROUP BY id
+\end{lstlisting}
+\end{algorithm}
+
+Table $toupdate$ records all vertices whose component IDs must be updated,
+and are thus marked active.
+
+\begin{algorithm}[Update toupdate table with active vertices]
+\begin{lstlisting}
+-- Find vertices whose component ID must be updated
+CREATE TABLE toupdate AS
+SELECT id, component_id
+FROM oldupdate, newupdate
+WHERE oldupdate.id = newupdate.id AND
+        oldupdate.component_id < newupdate.component_id
+
+-- Update the component IDs
+UPDATE newupdate SET
+component_id = toupdate.component_id
+FROM toupdate
+WHERE newupdate.id = toupdate.id
+\end{lstlisting}
+\end{algorithm}
+
+Finally, the $message$ table is updated with potential new
+component IDs for active vertices using the following query:
+
+\begin{algorithm}[Update oldupdate table]
+\begin{lstlisting}
+CREATE TEMP TABLE message AS
+SELECT id, MIN(component_id) AS component_id
+FROM (
+    SELECT edge.src AS id,
+        toupdate.component_id
+    FROM toupdate, edge
+    WHERE edge.dest = toupdate.id
+    UNION ALL
+    SELECT edge.dest AS id,
+        toupdate.component_id
+    FROM toupdate, edge
+    WHERE edge.src = toupdate.id
+) AS t
+GROUP BY id
+\end{lstlisting}
+\end{algorithm}
+
+At the end of the computation, $newupdate$ will have the component ID
+associated with each vertex in $G$. The component ID of all the vertices
+in a component is equal to the smallest vertex ID in the corresponding
+subgraph.
+

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/literature.bib
----------------------------------------------------------------------
diff --git a/doc/literature.bib b/doc/literature.bib
index d6941c4..08fb2dd 100644
--- a/doc/literature.bib
+++ b/doc/literature.bib
@@ -921,3 +921,13 @@ Applied Survival Analysis},
        howpublished = 
{\url{http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml}},
        note = {Accessed: 2017-06-07}
 }
+
+@inproceedings{grail,
+  author    = {Jing Fan and
+               Adalbert Gerald Soosai Raj and
+               Jignesh M. Patel},
+  title     = {The Case Against Specialized Graph Analytics Engines},
+  booktitle = {{CIDR} 2015, Seventh Biennial Conference on Innovative Data 
Systems
+               Research, Asilomar, CA, USA, January 4-7, 2015, Online 
Proceedings},
+  year      = {2015}
+}

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/mainpage.dox.in
----------------------------------------------------------------------
diff --git a/doc/mainpage.dox.in b/doc/mainpage.dox.in
index 9922ed2..e16f9b2 100644
--- a/doc/mainpage.dox.in
+++ b/doc/mainpage.dox.in
@@ -135,6 +135,9 @@ complete matrix stored as a distributed table.
     @defgroup grp_sssp Single Source Shortest Path
     @ingroup grp_graph
 
+    @defgroup grp_wcc Weakly Connected Components
+    @ingroup grp_graph
+
 @defgroup grp_mdl Model Evaluation
 @{Contains functions for evaluating accuracy and validation of predictive 
methods. @}
     @defgroup grp_validation Cross Validation

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/src/ports/postgres/modules/graph/test/wcc.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/test/wcc.sql_in 
b/src/ports/postgres/modules/graph/test/wcc.sql_in
index f9430f1..3751eb0 100644
--- a/src/ports/postgres/modules/graph/test/wcc.sql_in
+++ b/src/ports/postgres/modules/graph/test/wcc.sql_in
@@ -54,6 +54,38 @@ INSERT INTO edge VALUES
 (3, 0, 1),
 (5, 6, 1),
 (6, 3, 1),
+(10, 11, 1),
+(10, 12, 1),
+(11, 12, 1),
+(11, 13, 1),
+(12, 13, 1),
+(13, 10, 1),
+(15, 16, 1),
+(15, 14, 1);
+
+DROP TABLE IF EXISTS wcc_out;
+SELECT weakly_connected_components(
+    'vertex',
+    'vertex_id',
+    'edge',
+    'src=src_node,dest=dest_node',
+    'wcc_out');
+
+SELECT assert(relative_error(count(distinct component_id), 4) < 0.00001,
+        'Weakly Connected Components: Number of components found is not 4.'
+    ) FROM wcc_out;
+
+INSERT INTO edge VALUES
+(0, 1, 2),
+(0, 2, 2),
+(1, 2, 2),
+(1, 3, 2),
+(2, 3, 2),
+(2, 5, 2),
+(2, 6, 2),
+(3, 0, 2),
+(5, 6, 2),
+(6, 3, 2),
 (10, 11, 2),
 (10, 12, 2),
 (11, 12, 2),
@@ -69,8 +101,16 @@ SELECT weakly_connected_components(
     'vertex_id',
     'edge',
     'src=src_node,dest=dest_node',
-    'wcc_out');
+    'wcc_out',
+    'user_id');
+-- NOTE: The disconnected vertex '4' is not seen as a separate component
+-- in either group. This way of handling disconnected nodes is consistent
+-- with other graph modules that support grouping. At the moment (6/30/17),
+-- we have no way of including disconnected nodes inside a group.
+SELECT assert(relative_error(count(distinct component_id), 3) < 0.00001,
+        'Weakly Connected Components: Number of components found is not 4.'
+    ) FROM wcc_out WHERE user_id=2;
 
-SELECT assert(relative_error(count(distinct component_id), 4) < 0.00001,
+SELECT assert(relative_error(count(distinct component_id), 3) < 0.00001,
         'Weakly Connected Components: Number of components found is not 4.'
-    ) FROM wcc_out;
+    ) FROM wcc_out WHERE user_id=1;

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/src/ports/postgres/modules/graph/wcc.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.py_in 
b/src/ports/postgres/modules/graph/wcc.py_in
index d07ac05..02cceeb 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -28,7 +28,6 @@
 """
 
 import plpy
-from utilities.control import MinWarning
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string, split_quoted_delimited_str
@@ -37,7 +36,6 @@ from graph_utils import *
 
 m4_changequote(`<!', `!>')
 
-
 def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
         edge_params, out_table, grouping_cols_list, module_name):
     """
@@ -50,8 +48,14 @@ def validate_wcc_args(schema_madlib, vertex_table, 
vertex_id, edge_table,
         # to be column names in the edge_table, and not expressions!
         _assert(columns_exist_in_table(edge_table, grouping_cols_list, 
schema_madlib),
                 "Weakly Connected Components error: One or more grouping 
columns specified do not exist!")
-        with MinWarning("warning"):
-            plpy.warning("Grouping is not currently supported at the moment.")
+
+
+def prefix_tablename_to_colnames(table, cols_list):
+    return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list])
+
+def get_where_condition(table1, table2, cols_list):
+    return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col)
+            for col in cols_list])
 
 def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     out_table, grouping_cols, **kwargs):
@@ -97,17 +101,82 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
 
     distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
         <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
-
+    subq_prefixed_grouping_cols = ''
+    comma_toupdate_prefixed_grouping_cols = ''
+    comma_oldupdate_prefixed_grouping_cols = ''
+    old_new_update_where_condition = ''
+    new_to_update_where_condition = ''
+    edge_to_update_where_condition = ''
     is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
 
     INT_MAX = 2147483647
     component_id = 'component_id'
-    plpy.execute("""
-            CREATE TABLE {newupdate} AS
-            SELECT {vertex_id}, CAST({INT_MAX} AS INT) AS {component_id}
-            FROM {vertex_table}
-            {distribution}
-        """.format(**locals()))
+    if grouping_cols:
+        distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
+        <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>)
+        # Update some variables useful for grouping based query strings
+        subq = unique_string(desp='subquery')
+        distinct_grp_table = unique_string(desp='grptable')
+        plpy.execute("""
+                CREATE TABLE {distinct_grp_table} AS
+                SELECT DISTINCT {grouping_cols} FROM {edge_table}
+            """.format(**locals()))
+        comma_toupdate_prefixed_grouping_cols = ', ' + 
prefix_tablename_to_colnames(toupdate,
+            grouping_cols_list)
+        comma_oldupdate_prefixed_grouping_cols = ', ' + 
prefix_tablename_to_colnames(
+            oldupdate, grouping_cols_list)
+        subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq,
+            grouping_cols_list)
+        old_new_update_where_condition = ' AND ' + get_where_condition(
+            oldupdate, newupdate, grouping_cols_list)
+        new_to_update_where_condition = ' AND ' + get_where_condition(
+            newupdate, toupdate, grouping_cols_list)
+        edge_to_update_where_condition = ' AND ' + get_where_condition(
+            edge_table, toupdate, grouping_cols_list)
+        plpy.execute("""
+                CREATE TABLE {newupdate} AS
+                SELECT {subq}.{vertex_id},
+                        CAST({INT_MAX} AS INT) AS {component_id}
+                        {select_grouping_cols}
+                FROM {distinct_grp_table} INNER JOIN (
+                    SELECT {select_grouping_cols_clause} {src} AS {vertex_id}
+                    FROM {edge_table}
+                    UNION
+                    SELECT {select_grouping_cols_clause} {dest} AS {vertex_id}
+                    FROM {edge_table}
+                ) {subq}
+                ON {join_grouping_cols}
+                GROUP BY {group_by_clause}
+                {distribution}
+            """.format(select_grouping_cols=','+subq_prefixed_grouping_cols,
+                join_grouping_cols=get_where_condition(subq,
+                    distinct_grp_table, grouping_cols_list),
+                group_by_clause='' if not grouping_cols else
+                    subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, 
vertex_id),
+                select_grouping_cols_clause='' if not grouping_cols else
+                    grouping_cols+', ', **locals()))
+        plpy.execute("""
+                CREATE TEMP TABLE {message} AS
+                SELECT {vertex_id},
+                        CAST({vertex_id} AS INT) AS {component_id}
+                        {select_grouping_cols_clause}
+                FROM {newupdate}
+                {distribution}
+            """.format(select_grouping_cols_clause='' if not grouping_cols else
+                    ', '+grouping_cols, **locals()))
+    else:
+        plpy.execute("""
+                CREATE TABLE {newupdate} AS
+                SELECT {vertex_id}, CAST({INT_MAX} AS INT) AS {component_id}
+                FROM {vertex_table}
+                {distribution}
+            """.format(**locals()))
+        plpy.execute("""
+                CREATE TEMP TABLE {message} AS
+                SELECT {vertex_id}, CAST({vertex_id} AS INT) AS {component_id}
+                FROM {vertex_table}
+                {distribution}
+            """.format(**locals()))
     if is_hawq:
         plpy.execute("""
                 CREATE TABLE {temp_out_table} AS
@@ -115,12 +184,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                 LIMIT 0
                 {distribution}
             """.format(**locals()))
-    plpy.execute("""
-            CREATE TEMP TABLE {message} AS
-            SELECT {vertex_id}, CAST({vertex_id} AS INT) AS {component_id}
-            FROM {vertex_table}
-            {distribution}
-        """.format(**locals()))
     nodes_to_update = 1
     while nodes_to_update > 0:
         # This idea here is simple. Look at all the neighbors of a node, and
@@ -135,18 +198,25 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             CREATE TEMP TABLE {oldupdate} AS
             SELECT {message}.{vertex_id},
                     MIN({message}.{component_id}) AS {component_id}
+                    {grouping_cols_select}
             FROM {message}
-            GROUP BY {vertex_id}
+            GROUP BY {group_by_clause} {vertex_id}
             {distribution}
-        """.format(**locals()))
+        """.format(grouping_cols_select='' if not grouping_cols else
+                ', {0}'.format(grouping_cols), group_by_clause=''
+                if not grouping_cols else '{0}, '.format(grouping_cols),
+                **locals()))
 
         plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
         plpy.execute("""
             CREATE TEMP TABLE {toupdate} AS
-            SELECT {oldupdate}.{vertex_id}, {oldupdate}.{component_id}
+            SELECT {oldupdate}.{vertex_id},
+                    {oldupdate}.{component_id}
+                    {comma_oldupdate_prefixed_grouping_cols}
             FROM {oldupdate}, {newupdate}
             WHERE {oldupdate}.{vertex_id}={newupdate}.{vertex_id}
                 AND {oldupdate}.{component_id}<{newupdate}.{component_id}
+                {old_new_update_where_condition}
             {distribution}
         """.format(**locals()))
 
@@ -160,6 +230,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
                             SELECT *
                             FROM {toupdate}
                             WHERE 
{newupdate}.{vertex_id}={toupdate}.{vertex_id}
+                            {new_to_update_where_condition}
                         )
                         UNION
                         SELECT * FROM {toupdate};
@@ -179,28 +250,49 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                     {component_id}={toupdate}.{component_id}
                     FROM {toupdate}
                     WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id}
+                        {new_to_update_where_condition}
                 """.format(**locals()))
 
         plpy.execute("DROP TABLE IF EXISTS {0}".format(message))
         plpy.execute("""
             CREATE TEMP TABLE {message} AS
             SELECT {vertex_id}, MIN({component_id}) AS {component_id}
+                    {select_grouping_cols}
             FROM (
-                SELECT {edge_table}.{src} AS {vertex_id}, 
{toupdate}.{component_id}
+                SELECT {edge_table}.{src} AS {vertex_id},
+                    {toupdate}.{component_id}
+                    {comma_toupdate_prefixed_grouping_cols}
                 FROM {toupdate}, {edge_table}
                 WHERE {edge_table}.{dest} = {toupdate}.{vertex_id}
+                    {edge_to_update_where_condition}
                 UNION ALL
-                SELECT {edge_table}.{dest} AS {vertex_id}, 
{toupdate}.{component_id}
+                SELECT {edge_table}.{dest} AS {vertex_id},
+                    {toupdate}.{component_id}
+                    {comma_toupdate_prefixed_grouping_cols}
                 FROM {toupdate}, {edge_table}
                 WHERE {edge_table}.{src} = {toupdate}.{vertex_id}
+                    {edge_to_update_where_condition}
             ) AS t
-            GROUP BY {vertex_id}
-        """.format(**locals()))
+            GROUP BY {group_by_clause} {vertex_id}
+        """.format(select_grouping_cols='' if not grouping_cols
+                else ', {0}'.format(grouping_cols), group_by_clause=''
+                if not grouping_cols else ' {0}, '.format(grouping_cols),
+                **locals()))
 
         plpy.execute("DROP TABLE {0}".format(oldupdate))
-        nodes_to_update = plpy.execute("""
-                            SELECT COUNT(*) AS cnt FROM {toupdate}
-                        """.format(**locals()))[0]["cnt"]
+        if grouping_cols:
+            nodes_to_update = plpy.execute("""
+                                SELECT SUM(cnt) AS cnt_sum
+                                FROM (
+                                    SELECT COUNT(*) AS cnt
+                                    FROM {toupdate}
+                                    GROUP BY {grouping_cols}
+                                ) t
+                """.format(**locals()))[0]["cnt_sum"]
+        else:
+            nodes_to_update = plpy.execute("""
+                                SELECT COUNT(*) AS cnt FROM {toupdate}
+                            """.format(**locals()))[0]["cnt"]
 
     plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table))
     plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3}

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/src/ports/postgres/modules/graph/wcc.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.sql_in 
b/src/ports/postgres/modules/graph/wcc.sql_in
index af20281..a02db55 100644
--- a/src/ports/postgres/modules/graph/wcc.sql_in
+++ b/src/ports/postgres/modules/graph/wcc.sql_in
@@ -35,9 +35,7 @@ m4_include(`SQLCommon.m4')
 <div class="toc"><b>Contents</b>
 <ul>
 <li><a href="#wcc">Weakly Connected Components</a></li>
-<li><a href="#notes">Notes</a></li>
 <li><a href="#examples">Examples</a></li>
-<li><a href="#literature">Literature</a></li>
 </ul>
 </div>
 
@@ -89,24 +87,23 @@ this string argument:
 It will contain a row for every vertex from 'vertex_table' with
 the following columns:
   - vertex_id : The id of a vertex. Will use the input parameter 'vertex_id' 
for column naming.
-  - component_id : The vertex's component.
+  - component_id : Component that the vertex belongs to.
+  We use the convention where 'component_id' is the id of
+  the first vertex in a particular group.  It means that component ids
+  are generally not contiguous.
   - grouping_cols : Grouping column (if any) values associated with the 
vertex_id.</dd>
 
 <dt>grouping_cols (optional)</dt>
 <dd>TEXT, default: NULL. A single column or a list of comma-separated
-columns that divides the input data into discrete groups, resulting in one
-distribution per group. When this value is NULL, no grouping is used and
-a single model is generated for all data.
-@note Grouping is not currently supported at the moment.</dd>
+columns that divides the input data into discrete groups, which are 
+treated independently as separate graphs.
+When this value is NULL, no grouping is used and
+weakly connected components are generated for all data 
+(single graph).
+@note Expressions are not currently supported for 'grouping_cols'.</dd>
 
 </dl>
 
-@anchor notes
-@par Notes
-
-See the Grail project [1] for more background on graph analytics processing
-in relational databases.
-
 @anchor examples
 @examp
 
@@ -191,7 +188,7 @@ SELECT * FROM wcc_out ORDER BY component_id, id;
 -# Now all the weakly connected components associated with each user
 using the grouping feature:
 <pre class="syntax">
-DROP TABLE IF EXISTS wcc_out, wcc_out_summary;
+DROP TABLE IF EXISTS wcc_out;
 SELECT madlib.weakly_connected_components(
                          'vertex',             -- Vertex table
                          'id',                 -- Vertix id column
@@ -202,16 +199,27 @@ SELECT madlib.weakly_connected_components(
 SELECT * FROM wcc_out ORDER BY user_id, component_id, id;
 </pre>
 <pre class="result">
- user_id | id |    component_id
----------+----+--------------------
-
+ id | component_id | user_id
+----+--------------+---------
+  0 |            0 |       1
+  1 |            0 |       1
+  2 |            0 |       1
+  3 |            0 |       1
+  5 |            0 |       1
+  6 |            0 |       1
+ 10 |           10 |       2
+ 11 |           10 |       2
+ 12 |           10 |       2
+ 13 |           10 |       2
+ 14 |           14 |       2
+ 15 |           14 |       2
+ 16 |           14 |       2
+(13 rows)
 </pre>
+Note that vertex '4' is not identified as a separate component
+in the above result. This is because disconnected nodes cannot be assigned to
+a particular group with the current graph representation in MADlib.
 
-@anchor literature
-@par Literature
-
-[1] The case against specialized graph analytics engines, J. Fan, G. Soosai 
Raj,
-and J. M. Patel. CIDR 2015. 
http://cidrdb.org/cidr2015/Papers/CIDR15_Paper20.pdf
 */
 
 -------------------------------------------------------------------------

Reply via email to