Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use of 
Calcite to allow queries against flow files. NiFi presents an enumerable 
interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For queries 
with simple where clause and aggregations everything works great but when I use 
window analytic functions like row_number against larger sets of records(2gb) 
the processes uses all memory allocated and every cpu core. I’ve attached a 
profiler and I’m only ever seeing 2-3 MB in live bytes and most of the CPU 
usage is NiFi parsing the records.

I was hoping someone could give me a high level overview of how Calcite is 
executing the window function. I would expect it to have to iterate through the 
data at least once for the window and once for the data itself.

Here is the query being executed and a trace of the Calcite execution plan.

select  "antiNucleus",
        "eventFile",
        "eventNumber",
        "eventTime",
        "histFile",
        "multiplicity",
        "NaboveLb",
        "NbelowLb",
        "NLb",
        "primaryTracks",
        "prodTime",
        "Pt",
        "runNumber",
        "vertexX",
        "vertexY",
        "vertexZ",
        row_number() over() id
    from flowfile;

7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root: 
rel#10:Subset#1.ENUMERABLE.[]
Original rel:
LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0], 
eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4], 
multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8], primaryTracks=[$9], 
prodTime=[$10], Pt=[$11], runNumber=[$12], vertexX=[$13], vertexY=[$14], 
vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW)]): rowcount = 100.0, cumulative cost = {100.0 rows, 1700.0 cpu, 
0.0 io}, id = 7
  FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]], table=[[FLOWFILE]], 
fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 
100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 0

Sets:
Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus, 
JavaType(class java.lang.String) eventFile, JavaType(class java.lang.String) 
eventNumber, JavaType(class java.lang.String) eventTime, JavaType(class 
java.lang.String) histFile, JavaType(class java.lang.String) multiplicity, 
JavaType(class java.lang.String) NaboveLb, JavaType(class java.lang.String) 
NbelowLb, JavaType(class java.lang.String) NLb, JavaType(class 
java.lang.String) primaryTracks, JavaType(class java.lang.String) prodTime, 
JavaType(class java.lang.String) Pt, JavaType(class java.lang.String) 
runNumber, JavaType(class java.lang.String) vertexX, JavaType(class 
java.lang.String) vertexY, JavaType(class java.lang.String) vertexZ)
        rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
                
rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3, 4, 
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative cost={100.0 
rows, 101.0 cpu, 0.0 io}
Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus, 
JavaType(class java.lang.String) eventFile, JavaType(class java.lang.String) 
eventNumber, JavaType(class java.lang.String) eventTime, JavaType(class 
java.lang.String) histFile, JavaType(class java.lang.String) multiplicity, 
JavaType(class java.lang.String) NaboveLb, JavaType(class java.lang.String) 
NbelowLb, JavaType(class java.lang.String) NLb, JavaType(class 
java.lang.String) primaryTracks, JavaType(class java.lang.String) prodTime, 
JavaType(class java.lang.String) Pt, JavaType(class java.lang.String) 
runNumber, JavaType(class java.lang.String) vertexX, JavaType(class 
java.lang.String) vertexY, JavaType(class java.lang.String) vertexZ, BIGINT id)
        rel#8:Subset#1.NONE.[], best=null, importance=0.9
                
rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
 OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0, 
cumulative cost={inf}
                
rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
 {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
        rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
                
rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
 rowcount=100.0, cumulative cost={inf}
                
rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
 {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu, 0.0 
io}


7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new 
EnumerableWindow#16
7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner - Cheapest 
plan:
EnumerableWindow(window#0=[window(partition {} order by [] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount = 100.0, 
cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
  FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 
10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 
101.0 cpu, 0.0 io}, id = 0

7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner - Provenance:
EnumerableWindow#16
  direct
    
rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
 {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()]))
      call#112 rule [EnumerableWindowRule]
        
rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
 {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()]))
          call#64 rule [ProjectToWindowRule:project]
            
rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
 OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
              no parent
rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3, 4, 
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
  no parent

7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new 
HepRelVertex#17
7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new 
EnumerableWindow#18
7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new 
HepRelVertex#19
7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
Breadth-first from root:  {
    HepRelVertex#19 = 
rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
 {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu, 0.0 
io}
    HepRelVertex#17 = 
rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3, 4, 
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative cost={100.0 
rows, 101.0 cpu, 0.0 io}

Reply via email to