In my example I'm not setting a partition key as I'm trying to generate just a 
sequential list of integers. It sounds like everything is held in memory in a 
SortedMultiMap which explains the high memory usage though I'm not sure why 
it's using several times the memory as the input record set. My test file is a 
2gb CSV and I'll hit out of memory with an '-Xmx16g' on the process.

My last question was to try and figure out how many times Calcite would have to 
iterate over the data to produce a result as when I give it large amounts of 
memory it does eventually finish it just takes substantially longer than the 
time to iterate through the data once. For example a simple select * might take 
2-3 minutes for my test file but the row_number version takes 30+ minutes if it 
ever finishes.

Just in case I missed it, Calcite doesn't have a simple counter function like 
Oracles rownum function does it?

Thanks
Shawn

On 1/1/20, 9:55 PM, "Feng Zhu" <[email protected]> wrote:

    Hi, Shawn
    
    I was hoping someone could give me a high level overview of how Calcite is
    > executing the window function.
    >
    Calcite runtime generates Java code and complies the code with Janino. For
    Window Enumerable Process,
    there is a sample piece of code in
    
*org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
    which may be what you need.
    
    In general, input rows will be firstly organized into a*
    org.apache.calcite.runtime.SortedMultiMap* according to partitioning key,
    and then all of the windowed aggregate functions will be evaluated for each
    list of rows that have the same partitioning key.
    
    I would expect it to have to iterate through the data at least once for the
    > window and once for the data itself.
    >
    Could you elaborate?
    
    Best,
    Feng
    
    Shawn Weeks <[email protected]> 于2020年1月1日周三 上午1:23写道:
    
    > Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use
    > of Calcite to allow queries against flow files. NiFi presents an 
enumerable
    > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For
    > queries with simple where clause and aggregations everything works great
    > but when I use window analytic functions like row_number against larger
    > sets of records(2gb) the processes uses all memory allocated and every cpu
    > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live
    > bytes and most of the CPU usage is NiFi parsing the records.
    >
    > I was hoping someone could give me a high level overview of how Calcite is
    > executing the window function. I would expect it to have to iterate 
through
    > the data at least once for the window and once for the data itself.
    >
    > Here is the query being executed and a trace of the Calcite execution 
plan.
    >
    > select  "antiNucleus",
    >         "eventFile",
    >         "eventNumber",
    >         "eventTime",
    >         "histFile",
    >         "multiplicity",
    >         "NaboveLb",
    >         "NbelowLb",
    >         "NLb",
    >         "primaryTracks",
    >         "prodTime",
    >         "Pt",
    >         "runNumber",
    >         "vertexX",
    >         "vertexY",
    >         "vertexZ",
    >         row_number() over() id
    >     from flowfile;
    >
    > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root:
    > rel#10:Subset#1.ENUMERABLE.[]
    > Original rel:
    > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0],
    > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
    > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
    > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
    > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS
    > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
    > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
    >   FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
    > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
    > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0
    > io}, id = 0
    >
    > Sets:
    > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
    > JavaType(class java.lang.String) eventFile, JavaType(class
    > java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
    > JavaType(class java.lang.String) histFile, JavaType(class 
java.lang.String)
    > multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
    > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
    > JavaType(class java.lang.String) primaryTracks, JavaType(class
    > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
    > JavaType(class java.lang.String) runNumber, JavaType(class
    > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
    > JavaType(class java.lang.String) vertexZ)
    >         rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
    >
    > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
    > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
    > cost={100.0 rows, 101.0 cpu, 0.0 io}
    > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
    > JavaType(class java.lang.String) eventFile, JavaType(class
    > java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
    > JavaType(class java.lang.String) histFile, JavaType(class 
java.lang.String)
    > multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
    > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
    > JavaType(class java.lang.String) primaryTracks, JavaType(class
    > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
    > JavaType(class java.lang.String) runNumber, JavaType(class
    > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
    > JavaType(class java.lang.String) vertexZ, BIGINT id)
    >         rel#8:Subset#1.NONE.[], best=null, importance=0.9
    >
    > 
rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
    > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0,
    > cumulative cost={inf}
    >
    > 
rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
    >         rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
    >
    > 
rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
    > rowcount=100.0, cumulative cost={inf}
    >
    > 
rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
    > 0.0 io}
    >
    >
    > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > EnumerableWindow#16
    > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
    > Cheapest plan:
    > EnumerableWindow(window#0=[window(partition {} order by [] rows between
    > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount =
    > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
    >   FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7,
    > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = 
{100.0
    > rows, 101.0 cpu, 0.0 io}, id = 0
    >
    > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
    > Provenance:
    > EnumerableWindow#16
    >   direct
    >
    > 
rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()]))
    >       call#112 rule [EnumerableWindowRule]
    >
    > 
rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()]))
    >           call#64 rule [ProjectToWindowRule:project]
    >
    > 
rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
    > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
    >               no parent
    > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
    > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
    >   no parent
    >
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > HepRelVertex#17
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > EnumerableWindow#18
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > HepRelVertex#19
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
    > Breadth-first from root:  {
    >     HepRelVertex#19 =
    > 
rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
    > 0.0 io}
    >     HepRelVertex#17 =
    > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
    > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
    > cost={100.0 rows, 101.0 cpu, 0.0 io}
    >
    

Reply via email to