This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new f81b76de74 [SYSTEMDS-3708] Additional sort-merge raJoin method
f81b76de74 is described below

commit f81b76de742b1815f949c9d2aa9a13eb752499f2
Author: gghsu <[email protected]>
AuthorDate: Sat Jul 6 20:13:19 2024 +0200

    [SYSTEMDS-3708] Additional sort-merge raJoin method
    
    LDE project SoSe'24, part III.
    Closes 2044.
---
 scripts/builtin/raJoin.dml                         | 118 ++++++++++++++++++---
 .../functions/builtin/part2/BuiltinRaJoinTest.java |   6 +-
 src/test/scripts/functions/builtin/raJoin.dml      |   2 +-
 3 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/scripts/builtin/raJoin.dml b/scripts/builtin/raJoin.dml
index 333a1f3e8d..b7b299e7a8 100644
--- a/scripts/builtin/raJoin.dml
+++ b/scripts/builtin/raJoin.dml
@@ -28,7 +28,7 @@
 # colA      Integer indicating the column index of matrix A to execute inner 
join command
 # B         Matrix of right left data [shape: N x M]
 # colA      Integer indicating the column index of matrix B to execute inner 
join command
-# method    Join implementation method (nested-loop)
+# method    Join implementation method (nested-loop, sort-merge)
 # 
------------------------------------------------------------------------------
 #
 # OUTPUT:
@@ -37,21 +37,115 @@
 # 
------------------------------------------------------------------------------
 
 m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B,
-  Integer colB, String method="nested-loop")
+  Integer colB, String method="sort-merge")
   return (Matrix[Double] Y)
 {
-  # matrix of result data
-  Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )
-
-  for (i in 1:nrow(A)) {
-    for (j in 1:nrow(B)) {
-      if (as.scalar(A[i, colA] == B[j, colB])) {
-        # Combine the matching row from A and B to match
-        match = cbind(A[i,], B[j,])
-        # merge the match row into result Y
-        Y = rbind(Y, match)
+  # Sort the input Matrix with specific column in order to ensure same output 
order
+  A = order(target = A, by = colA, decreasing=FALSE, index.return=FALSE)
+  B = order(target = B, by = colB, decreasing=FALSE, index.return=FALSE)
+
+  if (method == "nested-loop") {
+    # matrix of result data
+    Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )
+
+    for (i in 1:nrow(A)) {
+      for (j in 1:nrow(B)) {
+        if (as.scalar(A[i, colA] == B[j, colB])) {
+          # Combine the matching row from A and B to match
+          match = cbind(A[i,], B[j,])
+          # merge the match row into result Y
+          Y = rbind(Y, match)
+        }
+      }
+    }
+  }
+  else if (method == "sort-merge") {
+    # get join key columns
+    left = A[, colA]
+    right = B[, colB]
+
+    # Sort join keys
+    leftIdx = order(target = left, decreasing=FALSE)
+    rightIdx = order(target = right, decreasing=FALSE)
+
+    # Ensure histograms are aligned by creating a common set of keys
+    commonKeys = max(max(left), max(right));
+
+    # Build histograms for the left and right key columns
+    leftHist = table(left, 1, commonKeys, 1)
+    rightHist = table(right, 1, commonKeys, 1)
+
+    # Compute the number of rows for each pair of matching keys
+    histMul = leftHist * rightHist
+
+    # Compute the prefx sums of histograms
+    cumLeftHist = cumsum(leftHist)
+    cumRightHist = cumsum(rightHist)
+    cumHistMul = cumsum(histMul)
+
+    # Initialize the output size and output offsets
+    outSize = cumHistMul[nrow(cumHistMul), 1]
+    if(as.scalar(outSize > 0)) {
+      offset = seq(1, as.scalar(outSize), 1)
+
+      # Find the bucket of matching keys to which each output belongs
+      outBucket = parallelBinarySearch(offset, cumHistMul)
+
+      # Determine the number of rows in outBucket
+      num_rows = nrow(outBucket)
+
+      # Initialize a matrix to store the result
+      updatedoffset = matrix(0, rows=num_rows, cols=1)
+      leftOutIdx = matrix(0, rows=num_rows, cols=1)
+      rightOutIdx = matrix(0, rows=num_rows, cols=1)
+
+      # Compute the element-wise subtraction and store in result
+      # TODO performance - try avoid iterating over rows
+      for(i in 1:num_rows) {
+        updatedoffset[i, 1] = offset[i, 1] - 
(cumHistMul[as.scalar(outBucket[i, 1]), 1] - histMul[as.scalar(outBucket[i, 
1]), 1]) -1
+        leftOutIdx[i, 1] = as.scalar(cumLeftHist[as.scalar(outBucket[i, 1]), 
1] - leftHist[as.scalar(outBucket[i, 1]), 1] + floor(updatedoffset[i, 1] / 
rightHist[as.scalar(outBucket[i, 1]), 1])) +1
+        rightOutIdx[i, 1] = as.scalar(cumRightHist[as.scalar(outBucket[i, 1]), 
1] - rightHist[as.scalar(outBucket[i, 1]), 1] + (updatedoffset[i, 1] %% 
rightHist[as.scalar(outBucket[i, 1]), 1])) +1
       }
+
+      nrows = length(offset)
+      ncolsA = ncol(A)
+      ncolsB = ncol(B)
+      Y = matrix(0, rows=nrows, cols=ncolsA + ncolsB)
+
+      # Populate the output matrix Y
+      for (i in 1:nrows) {
+        Y[i, 1:ncolsA] = A[as.scalar(leftOutIdx[i, 1]), ]
+        Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]), 
]
+      }
+    }
+    # TODO hash-based method which constructs permutation tables to replicate
+    # tuples of lhs and rhs and simply concatenates these tuples via cbind
+    else{
+      Y = matrix(0, rows=0, cols=1)
     }
   }
 }
 
+# Function to perform parallel binary search
+parallelBinarySearch = function (Matrix[double] offset, Matrix[double] 
cumHistMul)
+    return (Matrix[double] matched_result)
+{
+  n = nrow(cumHistMul)
+  result = matrix(0, rows=nrow(offset), cols=1)
+  for (i in 1:nrow(offset)) {
+    low = 1
+    high = n
+    while (low <= high) {
+      mid = as.integer((low + high) / 2)
+      if ( as.scalar(offset[i] <= cumHistMul[mid]) ) {
+        result[i] = mid
+        high = mid - 1
+      } else {
+        low = mid + 1
+      }
+    }
+  }
+
+  matched_result = result
+}
+
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
index 6c5ea9d8ac..f19262b854 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
@@ -41,6 +41,8 @@ public class BuiltinRaJoinTest extends AutomatedTestBase
                addTestConfiguration(TEST_NAME,new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"result"}));
        }
 
+       // TODO test all join methods
+       
        @Test
        public void testRaJoinTest() {
                //generate actual dataset and variables
@@ -64,9 +66,9 @@ public class BuiltinRaJoinTest extends AutomatedTestBase
                // Expected output matrix
                double[][] Y = {
                                {1, 2, 3, 1, 2, 9},
+                               {1, 3, 6, 1, 2, 9},
                                {4, 7, 8, 4, 7, 8},
                                {4, 7, 8, 4, 5, 10},
-                               {1, 3, 6, 1, 2, 9},
                                {4, 3, 5, 4, 7, 8},
                                {4, 3, 5, 4, 5, 10},
                };
@@ -107,7 +109,7 @@ public class BuiltinRaJoinTest extends AutomatedTestBase
                double[][] A = {
                                {1, 2, 3, 4, 5},
                                {6, 7, 8, 9, 10},
-                               {11, 12, 13, 14, 15},
+                               {11, 12, 13, 14, 8},
                                {16, 17, 18, 19, 20},
                                {21, 22, 23, 24, 25}
                };
diff --git a/src/test/scripts/functions/builtin/raJoin.dml 
b/src/test/scripts/functions/builtin/raJoin.dml
index 08483b8ea8..63aa2807c8 100644
--- a/src/test/scripts/functions/builtin/raJoin.dml
+++ b/src/test/scripts/functions/builtin/raJoin.dml
@@ -24,6 +24,6 @@ colA = as.integer($2)
 B = read($3)
 colB = as.integer($4)
 
-result = raJoin(A, colA, B, colB, "nested-loop");
+result = raJoin(A, colA, B, colB, "sort-merge");
 write(result, $5);
 

Reply via email to