Baunsgaard commented on a change in pull request #1237:
URL: https://github.com/apache/systemds/pull/1237#discussion_r625831261



##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
##########
@@ -548,27 +550,54 @@ protected MatrixBlock readBlobFromFederated(FederationMap 
fedMap, long[] dims)
                throws IOException
        {
                // TODO sparse optimization
-               MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1], 
false);
                List<Pair<FederatedRange, Future<FederatedResponse>>> 
readResponses = fedMap.requestFederatedData();
                try {
-                       for (Pair<FederatedRange, Future<FederatedResponse>> 
readResponse : readResponses) {
-                               FederatedRange range = readResponse.getLeft();
-                               FederatedResponse response = 
readResponse.getRight().get();
-                               // add result
-                               int[] beginDimsInt = range.getBeginDimsInt();
-                               int[] endDimsInt = range.getEndDimsInt();
-                               MatrixBlock multRes = (MatrixBlock) 
response.getData()[0];
-                               ret.copy(beginDimsInt[0], endDimsInt[0] - 1,
-                                       beginDimsInt[1], endDimsInt[1] - 1, 
multRes, false);
-                               ret.setNonZeros(ret.getNonZeros() + 
multRes.getNonZeros());
+                       if ( fedMap.getType() == FederationMap.FType.PART )
+                               return aggregateResponses(readResponses);
+                       else {
+                               return bindResponses(readResponses, dims);
                        }
                }
-               catch (Exception e) {
+               catch(Exception e) {
                        throw new DMLRuntimeException("Federated matrix read 
failed.", e);
                }
-               
+       }
+
+       /**
+        * Bind data from federated workers based on non-overlapping federated 
ranges.
+        * @param readResponses responses from federated workers containing the 
federated ranges and data
+        * @param dims dimensions of output MatrixBlock
+        * @return MatrixBlock of consolidated data
+        * @throws Exception in case of problems with getting data from 
responses
+        */
+       private MatrixBlock bindResponses(List<Pair<FederatedRange, 
Future<FederatedResponse>>> readResponses, long[] dims)
+       throws Exception {
+               MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1], 
false);
+               for(Pair<FederatedRange, Future<FederatedResponse>> 
readResponse : readResponses) {
+                       FederatedRange range = readResponse.getLeft();
+                       FederatedResponse response = 
readResponse.getRight().get();
+                       // add result
+                       int[] beginDimsInt = range.getBeginDimsInt();
+                       int[] endDimsInt = range.getEndDimsInt();
+                       MatrixBlock multRes = (MatrixBlock) 
response.getData()[0];
+                       ret.copy(beginDimsInt[0], endDimsInt[0] - 1, 
beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
+                       ret.setNonZeros(ret.getNonZeros() + 
multRes.getNonZeros());
+               }
                return ret;
        }
+
+       /**
+        * Aggregate partially aggregated data from federated workers
+        * by adding values with the same index in different federated 
locations.
+        * @param readResponses responses from federated workers containing the 
federated data
+        * @return MatrixBlock of consolidated, aggregated data
+        */
+       private MatrixBlock aggregateResponses(List<Pair<FederatedRange, 
Future<FederatedResponse>>> readResponses) {
+               List<Future<FederatedResponse>> dataParts = new ArrayList<>();
+               for ( Pair<FederatedRange, Future<FederatedResponse>> 
readResponse : readResponses )
+                       dataParts.add(readResponse.getValue());
+               return FederationUtils.aggAdd(dataParts.toArray(new Future[0]));

Review comment:
       I Like it, it is basically the same as I'm doing with the Overlapping in 
compression.

##########
File path: src/main/java/org/apache/sysds/lops/MatMultCP.java
##########
@@ -73,15 +73,15 @@ public String toString() {
        @Override
        public String getInstructions(String input1, String input2, String 
output) {
                if(!useTranspose) {
-                       return 
InstructionUtils.concatOperands(getExecType().name(),
+                       
InstructionUtils.concatBaseOperands(getExecType().name(),

Review comment:
       I don't like this change because it hides where the variable is.
   also i'm not sure if it works, since strings behave differently in java than 
other variables (immutable),

##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
##########
@@ -81,6 +88,10 @@ public long getSize() {
                        size *= getSize(i);
                return size;
        }
+
+       public int getOverlapNum(){
+               return _overlapNum;

Review comment:
       In my experience it does not matter how many overlaps you have, just 
have a boolean specifying if there is overlap or not.

##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
##########
@@ -94,12 +105,19 @@ public int compareTo(FederatedRange o) {
                        if ( _beginDims[i] > o._beginDims[i])
                                return 1;
                }
+               if (_overlapNum < o._overlapNum)
+                       return -1;
+               if (_overlapNum > o._overlapNum)
+                       return 1;

Review comment:
       how is it relevant which index it is?
   if you are trying to keep the overlaps in a certain order, it is really not 
a needed feature. 
   But then again if it really is necessary to sort the federated ranges i 
guess this is an okay way.
   
   if you change _overlapNum to be a boolean, you could sort on Object pointers 
instead in last case.

##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
##########
@@ -50,6 +50,7 @@
                ROW, // row partitioned, groups of rows
                COL, // column partitioned, groups of columns
                FULL, // Meaning both Row and Column indicating a single 
federated location and a full matrix
+               PART, // Partial aggregates in several federated locations with 
addition as the aggregation method

Review comment:
       Call it Overlap, to not confuse (at least me) and this overlap is a 
concept i have in compression as well, where i call it ... overlap.

##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
##########
@@ -189,9 +190,14 @@ public FederatedRequest broadcast(ScalarObject scalar) {
                return ret;
        }
 
+       /**
+        * Determines if the two federation maps are aligned row/column 
partitions
+        * at the same federated sites (which allows for purely federated 
operation)
+        * @param that FederationMap to check alignment with
+        * @param transposed true if that FederationMap should be transposed 
before checking alignment
+        * @return true if this and that FederationMap are aligned

Review comment:
       Thanks for the documentations !!!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to