Hi Matthias, Thanks for the very detailed explanation- it was very useful in clearing up many questions. The point regarding AggregateOperator vs AggregateUnaryOperator is clear now. I understand how to use the optional axis metadata specified by the user in countDistinctApprox(input, axis) in AggregateUnaryOperator to implement full, row, and column aggregates (Direction.RowCol, and Direction.Row, and Direction.Col, respectively). I am less clear about how to implement a custom map function for countDistinct by extending ValueFunction, though. Anyhow, let me take a stab at it again with a fresh mind, I'll be sure to ask on the thread if it becomes a blocker.
Thanks, Badrul On Tue, Jul 20, 2021 at 3:22 AM Matthias Boehm <[email protected]> wrote: > thanks for asking - let's look at a concrete example: rowSums(X), which > is compiled to the instruction opcode "uark+" (unary aggregate row kahan > addition). The related aggregate operator is constructed as follows: > > agg = new AggregateOperator(0, > KahanPlus.getKahanPlusFnObject(), > CorrectionLocationType.LASTCOLUMN); > aggun = new AggregateUnaryOperator(agg, > ReduceCol.getReduceColFnObject(), numThreads); > > The aggregate operator reflects the operation (kahan addition, and > indicates any correction columns/rows if needed - for row kahan addition > its one additional temporay column). The AggregateUnaryOperator then > describes the direction (axis) of aggregation and additional meta data. > > This design stems from a time, where SystemML had a generic > implementation of such default aggregations, leveraging the aggregation > and indexing operators to obtain the aggregate the values. However, for > many years now we have a dedicate kernel library 'LibMatrixAgg' which > implements efficient dense and sparse kernels of all operators - so > these operators are mainly used to communicate the exact operation > parameters down to the runtime. Some of the aggregation operators are > still internally used (e.g., Kahan addition), while the reductions are > mostly not, so we could condense this to a single AggregateOperator. The > reason there is a AggregateUnaryOperator is that there was also a > AggregateBinaryOperator as used for matrix multiply, but again we added > a 'LibMatrixMult' kernel library for performance. > > For your countDistinct operations, I would recommend to to follow the > example of existing operators (like row/colSums). For the spark > operations, only the map needs the full aggregation meta data as it > determines the output indexes of blocks and thus what and how aggStable > aggregates values for equal keys. > > Regards, > Matthias > > On 7/20/2021 4:37 AM, Badrul Chowdhury wrote: > > Hi, > > > > I am having difficulty understanding why the AggregateUnaryOperator would > > need a corresponding AggregateOperator in the constructor. I am confused > by > > the intent behind the following: > > > > (Lines 69 and 73 in > > > /Users/badrulchowdhury/code/systemds/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java) > > InstructionUtils.deriveAggregateOperatorOpcode(opcode); > > AggregateOperator aop = InstructionUtils.parseAggregateOperator(aopcode, > > corrLoc.toString()); > > > > If I had to guess, I would say that UnaryAggregateOperator is for a > single > > matrix block (unary) whereas AggregateOperator is for combining results > > from multiple matrix blocks (binary). At least that is what the following > > snippet from the processMatrixAggregate() method in the same file (Line > > 108) seems to suggest: > > > > JavaRDD<MatrixBlock> out2 = out.map( > > new RDDUAggFunction2(*auop*, mc.getBlocksize())); > > MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, *aggop*); > > > > Would really appreciate it if somebody could confirm my suspicions or > point > > me in the right direction. Thanks in advance! > > > > -Badrul > > > -- Cheers, Badrul
