sounds good - if it turns out countDistinctApprox is to hard to
integrate into the default aggregate unary functions, feel free to add
specialized branches (with custom spark functions) or even a dedicated
instruction opcode. From my perspective this would make sense because
the output of the block aggregation is a KMV or HyperLogLog sketch.
Regards,
Matthias
On 7/21/2021 5:54 AM, Badrul Chowdhury wrote:
Hi Matthias,
Thanks for the very detailed explanation- it was very useful in clearing up
many questions. The point regarding AggregateOperator vs
AggregateUnaryOperator is clear now. I understand how to use the optional
axis metadata specified by the user in countDistinctApprox(input, axis) in
AggregateUnaryOperator to implement full, row, and column aggregates
(Direction.RowCol, and Direction.Row, and Direction.Col, respectively). I
am less clear about how to implement a custom map function for
countDistinct by extending ValueFunction, though. Anyhow, let me take a
stab at it again with a fresh mind, I'll be sure to ask on the thread if it
becomes a blocker.
Thanks,
Badrul
On Tue, Jul 20, 2021 at 3:22 AM Matthias Boehm <[email protected]> wrote:
thanks for asking - let's look at a concrete example: rowSums(X), which
is compiled to the instruction opcode "uark+" (unary aggregate row kahan
addition). The related aggregate operator is constructed as follows:
agg = new AggregateOperator(0,
KahanPlus.getKahanPlusFnObject(),
CorrectionLocationType.LASTCOLUMN);
aggun = new AggregateUnaryOperator(agg,
ReduceCol.getReduceColFnObject(), numThreads);
The aggregate operator reflects the operation (kahan addition, and
indicates any correction columns/rows if needed - for row kahan addition
its one additional temporay column). The AggregateUnaryOperator then
describes the direction (axis) of aggregation and additional meta data.
This design stems from a time, where SystemML had a generic
implementation of such default aggregations, leveraging the aggregation
and indexing operators to obtain the aggregate the values. However, for
many years now we have a dedicate kernel library 'LibMatrixAgg' which
implements efficient dense and sparse kernels of all operators - so
these operators are mainly used to communicate the exact operation
parameters down to the runtime. Some of the aggregation operators are
still internally used (e.g., Kahan addition), while the reductions are
mostly not, so we could condense this to a single AggregateOperator. The
reason there is a AggregateUnaryOperator is that there was also a
AggregateBinaryOperator as used for matrix multiply, but again we added
a 'LibMatrixMult' kernel library for performance.
For your countDistinct operations, I would recommend to to follow the
example of existing operators (like row/colSums). For the spark
operations, only the map needs the full aggregation meta data as it
determines the output indexes of blocks and thus what and how aggStable
aggregates values for equal keys.
Regards,
Matthias
On 7/20/2021 4:37 AM, Badrul Chowdhury wrote:
Hi,
I am having difficulty understanding why the AggregateUnaryOperator would
need a corresponding AggregateOperator in the constructor. I am confused
by
the intent behind the following:
(Lines 69 and 73 in
/Users/badrulchowdhury/code/systemds/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java)
InstructionUtils.deriveAggregateOperatorOpcode(opcode);
AggregateOperator aop = InstructionUtils.parseAggregateOperator(aopcode,
corrLoc.toString());
If I had to guess, I would say that UnaryAggregateOperator is for a
single
matrix block (unary) whereas AggregateOperator is for combining results
from multiple matrix blocks (binary). At least that is what the following
snippet from the processMatrixAggregate() method in the same file (Line
108) seems to suggest:
JavaRDD<MatrixBlock> out2 = out.map(
new RDDUAggFunction2(*auop*, mc.getBlocksize()));
MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, *aggop*);
Would really appreciate it if somebody could confirm my suspicions or
point
me in the right direction. Thanks in advance!
-Badrul