Hi, Shawn I was hoping someone could give me a high level overview of how Calcite is > executing the window function. > Calcite runtime generates Java code and complies the code with Janino. For Window Enumerable Process, there is a sample piece of code in *org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*, which may be what you need.
In general, input rows will be firstly organized into a* org.apache.calcite.runtime.SortedMultiMap* according to partitioning key, and then all of the windowed aggregate functions will be evaluated for each list of rows that have the same partitioning key. I would expect it to have to iterate through the data at least once for the > window and once for the data itself. > Could you elaborate? Best, Feng Shawn Weeks <[email protected]> 于2020年1月1日周三 上午1:23写道: > Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use > of Calcite to allow queries against flow files. NiFi presents an enumerable > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For > queries with simple where clause and aggregations everything works great > but when I use window analytic functions like row_number against larger > sets of records(2gb) the processes uses all memory allocated and every cpu > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live > bytes and most of the CPU usage is NiFi parsing the records. > > I was hoping someone could give me a high level overview of how Calcite is > executing the window function. I would expect it to have to iterate through > the data at least once for the window and once for the data itself. > > Here is the query being executed and a trace of the Calcite execution plan. > > select "antiNucleus", > "eventFile", > "eventNumber", > "eventTime", > "histFile", > "multiplicity", > "NaboveLb", > "NbelowLb", > "NLb", > "primaryTracks", > "prodTime", > "Pt", > "runNumber", > "vertexX", > "vertexY", > "vertexZ", > row_number() over() id > from flowfile; > > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root: > rel#10:Subset#1.ENUMERABLE.[] > Original rel: > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0], > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4], > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8], > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12], > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0, > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7 > FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]], > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 > io}, id = 0 > > Sets: > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus, > JavaType(class java.lang.String) eventFile, JavaType(class > java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime, > JavaType(class java.lang.String) histFile, JavaType(class java.lang.String) > multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb, > JavaType(class java.lang.String) primaryTracks, JavaType(class > java.lang.String) prodTime, JavaType(class java.lang.String) Pt, > JavaType(class java.lang.String) runNumber, JavaType(class > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY, > JavaType(class java.lang.String) vertexZ) > rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81 > > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3, > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative > cost={100.0 rows, 101.0 cpu, 0.0 io} > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus, > JavaType(class java.lang.String) eventFile, JavaType(class > java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime, > JavaType(class java.lang.String) histFile, JavaType(class java.lang.String) > multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb, > JavaType(class java.lang.String) primaryTracks, JavaType(class > java.lang.String) prodTime, JavaType(class java.lang.String) Pt, > JavaType(class java.lang.String) runNumber, JavaType(class > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY, > JavaType(class java.lang.String) vertexZ, BIGINT id) > rel#8:Subset#1.NONE.[], best=null, importance=0.9 > > rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER() > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0, > cumulative cost={inf} > > rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf} > rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0 > > rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]), > rowcount=100.0, cumulative cost={inf} > > rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu, > 0.0 io} > > > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new > EnumerableWindow#16 > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner - > Cheapest plan: > EnumerableWindow(window#0=[window(partition {} order by [] rows between > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount = > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16 > FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 > rows, 101.0 cpu, 0.0 io}, id = 0 > > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner - > Provenance: > EnumerableWindow#16 > direct > > rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs > [ROW_NUMBER()])) > call#112 rule [EnumerableWindowRule] > > rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs > [ROW_NUMBER()])) > call#64 rule [ProjectToWindowRule:project] > > rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER() > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) > no parent > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3, > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) > no parent > > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new > HepRelVertex#17 > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new > EnumerableWindow#18 > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new > HepRelVertex#19 > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - > Breadth-first from root: { > HepRelVertex#19 = > rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu, > 0.0 io} > HepRelVertex#17 = > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3, > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative > cost={100.0 rows, 101.0 cpu, 0.0 io} >
