thanks for asking - let's look at a concrete example: rowSums(X), which is compiled to the instruction opcode "uark+" (unary aggregate row kahan addition). The related aggregate operator is constructed as follows:

agg = new AggregateOperator(0,
    KahanPlus.getKahanPlusFnObject(),
    CorrectionLocationType.LASTCOLUMN);
aggun = new AggregateUnaryOperator(agg,
    ReduceCol.getReduceColFnObject(), numThreads);

The aggregate operator reflects the operation (kahan addition, and indicates any correction columns/rows if needed - for row kahan addition its one additional temporay column). The AggregateUnaryOperator then describes the direction (axis) of aggregation and additional meta data.

This design stems from a time, where SystemML had a generic implementation of such default aggregations, leveraging the aggregation and indexing operators to obtain the aggregate the values. However, for many years now we have a dedicate kernel library 'LibMatrixAgg' which implements efficient dense and sparse kernels of all operators - so these operators are mainly used to communicate the exact operation parameters down to the runtime. Some of the aggregation operators are still internally used (e.g., Kahan addition), while the reductions are mostly not, so we could condense this to a single AggregateOperator. The reason there is a AggregateUnaryOperator is that there was also a AggregateBinaryOperator as used for matrix multiply, but again we added a 'LibMatrixMult' kernel library for performance.

For your countDistinct operations, I would recommend to to follow the example of existing operators (like row/colSums). For the spark operations, only the map needs the full aggregation meta data as it determines the output indexes of blocks and thus what and how aggStable aggregates values for equal keys.

Regards,
Matthias

On 7/20/2021 4:37 AM, Badrul Chowdhury wrote:
Hi,

I am having difficulty understanding why the AggregateUnaryOperator would
need a corresponding AggregateOperator in the constructor. I am confused by
the intent behind the following:

(Lines 69 and 73 in
/Users/badrulchowdhury/code/systemds/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java)
InstructionUtils.deriveAggregateOperatorOpcode(opcode);
AggregateOperator aop = InstructionUtils.parseAggregateOperator(aopcode,
corrLoc.toString());

If I had to guess, I would say that UnaryAggregateOperator is for a single
matrix block (unary) whereas AggregateOperator is for combining results
from multiple matrix blocks (binary). At least that is what the following
snippet from the processMatrixAggregate() method in the same file (Line
108) seems to suggest:

    JavaRDD<MatrixBlock> out2 = out.map(
new RDDUAggFunction2(*auop*, mc.getBlocksize()));
MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, *aggop*);

Would really appreciate it if somebody could confirm my suspicions or point
me in the right direction. Thanks in advance!

-Badrul

Reply via email to