Hi, Shawn

I was hoping someone could give me a high level overview of how Calcite is
> executing the window function.
>
Calcite runtime generates Java code and complies the code with Janino. For
Window Enumerable Process,
there is a sample piece of code in
*org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
which may be what you need.

In general, input rows will be firstly organized into a*
org.apache.calcite.runtime.SortedMultiMap* according to partitioning key,
and then all of the windowed aggregate functions will be evaluated for each
list of rows that have the same partitioning key.

I would expect it to have to iterate through the data at least once for the
> window and once for the data itself.
>
Could you elaborate?

Best,
Feng

Shawn Weeks <[email protected]> 于2020年1月1日周三 上午1:23写道:

> Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use
> of Calcite to allow queries against flow files. NiFi presents an enumerable
> interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For
> queries with simple where clause and aggregations everything works great
> but when I use window analytic functions like row_number against larger
> sets of records(2gb) the processes uses all memory allocated and every cpu
> core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live
> bytes and most of the CPU usage is NiFi parsing the records.
>
> I was hoping someone could give me a high level overview of how Calcite is
> executing the window function. I would expect it to have to iterate through
> the data at least once for the window and once for the data itself.
>
> Here is the query being executed and a trace of the Calcite execution plan.
>
> select  "antiNucleus",
>         "eventFile",
>         "eventNumber",
>         "eventTime",
>         "histFile",
>         "multiplicity",
>         "NaboveLb",
>         "NbelowLb",
>         "NLb",
>         "primaryTracks",
>         "prodTime",
>         "Pt",
>         "runNumber",
>         "vertexX",
>         "vertexY",
>         "vertexZ",
>         row_number() over() id
>     from flowfile;
>
> 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root:
> rel#10:Subset#1.ENUMERABLE.[]
> Original rel:
> LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0],
> eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
> multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
> primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
> vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
>   FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
> table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
> 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0
> io}, id = 0
>
> Sets:
> Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ)
>         rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
>
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
> Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ, BIGINT id)
>         rel#8:Subset#1.NONE.[], best=null, importance=0.9
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0,
> cumulative cost={inf}
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
>         rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
>
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
> rowcount=100.0, cumulative cost={inf}
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
>
>
> 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#16
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Cheapest plan:
> EnumerableWindow(window#0=[window(partition {} order by [] rows between
> UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount =
> 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
>   FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7,
> 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0
> rows, 101.0 cpu, 0.0 io}, id = 0
>
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Provenance:
> EnumerableWindow#16
>   direct
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
>       call#112 rule [EnumerableWindowRule]
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
>           call#64 rule [ProjectToWindowRule:project]
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
>               no parent
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
>   no parent
>
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#17
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#18
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#19
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Breadth-first from root:  {
>     HepRelVertex#19 =
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
>     HepRelVertex#17 =
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
>

Reply via email to