Baunsgaard commented on a change in pull request #1237:
URL: https://github.com/apache/systemds/pull/1237#discussion_r625831261
##########
File path:
src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
##########
@@ -548,27 +550,54 @@ protected MatrixBlock readBlobFromFederated(FederationMap
fedMap, long[] dims)
throws IOException
{
// TODO sparse optimization
- MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1],
false);
List<Pair<FederatedRange, Future<FederatedResponse>>>
readResponses = fedMap.requestFederatedData();
try {
- for (Pair<FederatedRange, Future<FederatedResponse>>
readResponse : readResponses) {
- FederatedRange range = readResponse.getLeft();
- FederatedResponse response =
readResponse.getRight().get();
- // add result
- int[] beginDimsInt = range.getBeginDimsInt();
- int[] endDimsInt = range.getEndDimsInt();
- MatrixBlock multRes = (MatrixBlock)
response.getData()[0];
- ret.copy(beginDimsInt[0], endDimsInt[0] - 1,
- beginDimsInt[1], endDimsInt[1] - 1,
multRes, false);
- ret.setNonZeros(ret.getNonZeros() +
multRes.getNonZeros());
+ if ( fedMap.getType() == FederationMap.FType.PART )
+ return aggregateResponses(readResponses);
+ else {
+ return bindResponses(readResponses, dims);
}
}
- catch (Exception e) {
+ catch(Exception e) {
throw new DMLRuntimeException("Federated matrix read
failed.", e);
}
-
+ }
+
+ /**
+ * Bind data from federated workers based on non-overlapping federated
ranges.
+ * @param readResponses responses from federated workers containing the
federated ranges and data
+ * @param dims dimensions of output MatrixBlock
+ * @return MatrixBlock of consolidated data
+ * @throws Exception in case of problems with getting data from
responses
+ */
+ private MatrixBlock bindResponses(List<Pair<FederatedRange,
Future<FederatedResponse>>> readResponses, long[] dims)
+ throws Exception {
+ MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1],
false);
+ for(Pair<FederatedRange, Future<FederatedResponse>>
readResponse : readResponses) {
+ FederatedRange range = readResponse.getLeft();
+ FederatedResponse response =
readResponse.getRight().get();
+ // add result
+ int[] beginDimsInt = range.getBeginDimsInt();
+ int[] endDimsInt = range.getEndDimsInt();
+ MatrixBlock multRes = (MatrixBlock)
response.getData()[0];
+ ret.copy(beginDimsInt[0], endDimsInt[0] - 1,
beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
+ ret.setNonZeros(ret.getNonZeros() +
multRes.getNonZeros());
+ }
return ret;
}
+
+ /**
+ * Aggregate partially aggregated data from federated workers
+ * by adding values with the same index in different federated
locations.
+ * @param readResponses responses from federated workers containing the
federated data
+ * @return MatrixBlock of consolidated, aggregated data
+ */
+ private MatrixBlock aggregateResponses(List<Pair<FederatedRange,
Future<FederatedResponse>>> readResponses) {
+ List<Future<FederatedResponse>> dataParts = new ArrayList<>();
+ for ( Pair<FederatedRange, Future<FederatedResponse>>
readResponse : readResponses )
+ dataParts.add(readResponse.getValue());
+ return FederationUtils.aggAdd(dataParts.toArray(new Future[0]));
Review comment:
I Like it, it is basically the same as I'm doing with the Overlapping in
compression.
##########
File path: src/main/java/org/apache/sysds/lops/MatMultCP.java
##########
@@ -73,15 +73,15 @@ public String toString() {
@Override
public String getInstructions(String input1, String input2, String
output) {
if(!useTranspose) {
- return
InstructionUtils.concatOperands(getExecType().name(),
+
InstructionUtils.concatBaseOperands(getExecType().name(),
Review comment:
I don't like this change because it hides where the variable is.
also i'm not sure if it works, since strings behave differently in java than
other variables (immutable),
##########
File path:
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
##########
@@ -81,6 +88,10 @@ public long getSize() {
size *= getSize(i);
return size;
}
+
+ public int getOverlapNum(){
+ return _overlapNum;
Review comment:
In my experience it does not matter how many overlaps you have, just
have a boolean specifying if there is overlap or not.
##########
File path:
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
##########
@@ -94,12 +105,19 @@ public int compareTo(FederatedRange o) {
if ( _beginDims[i] > o._beginDims[i])
return 1;
}
+ if (_overlapNum < o._overlapNum)
+ return -1;
+ if (_overlapNum > o._overlapNum)
+ return 1;
Review comment:
how is it relevant which index it is?
if you are trying to keep the overlaps in a certain order, it is really not
a needed feature.
But then again if it really is necessary to sort the federated ranges i
guess this is an okay way.
if you change _overlapNum to be a boolean, you could sort on Object pointers
instead in last case.
##########
File path:
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
##########
@@ -50,6 +50,7 @@
ROW, // row partitioned, groups of rows
COL, // column partitioned, groups of columns
FULL, // Meaning both Row and Column indicating a single
federated location and a full matrix
+ PART, // Partial aggregates in several federated locations with
addition as the aggregation method
Review comment:
Call it Overlap, to not confuse (at least me) and this overlap is a
concept i have in compression as well, where i call it ... overlap.
##########
File path:
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
##########
@@ -189,9 +190,14 @@ public FederatedRequest broadcast(ScalarObject scalar) {
return ret;
}
+ /**
+ * Determines if the two federation maps are aligned row/column
partitions
+ * at the same federated sites (which allows for purely federated
operation)
+ * @param that FederationMap to check alignment with
+ * @param transposed true if that FederationMap should be transposed
before checking alignment
+ * @return true if this and that FederationMap are aligned
Review comment:
Thanks for the documentations !!!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]