The memory issue may be related with the implementation.
For row_number() function, the runtime processing logic is demonstrated as
below.
It can be seen that too many data structures (i.e., list/array/map) will be
created.
Best,
Feng
=====================================================================================================
*// 1.Loading data set into a List*
final java.util.List tempList = (java.util.List)
org.apache.calcite.linq4j.Linq4j.asEnumerable(***your data set***).into(new
java.util.ArrayList());
*// 2.Sorting the data set (Note: a new map&Array&Iterator will be
constructed, see SortedMultiMap)*
final java.util.Comparator comparator = new java.util.Comparator(){...};
final java.util.Iterator iterator =
org.apache.calcite.runtime.SortedMultiMap.singletonArrayIterator(comparator,
tempList);
*// 3.Constructing a new ArrayList for result*
final java.util.ArrayList _list = new
java.util.ArrayList(tempList.size());
*// 4. Computing row_number()*
Long a0w0 = (Long) null; *// for row_number*
while (iterator.hasNext()) {
final Object[] _rows = (Object[]) iterator.next();
for (int i = 0; i < _rows.length; ++i) {
final org.apache.calcite.test.JdbcTest.Employee row =
(org.apache.calcite.test.JdbcTest.Employee) _rows[i];
......
_list.add(new Object[] {
...... // row columns
a0w0});
}
}
tempList.clear();
final org.apache.calcite.linq4j.Enumerable _inputEnumerable =
org.apache.calcite.linq4j.Linq4j.asEnumerable(_list);
return new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator enumerator() {
return new org.apache.calcite.linq4j.Enumerator(){
public final org.apache.calcite.linq4j.Enumerator
inputEnumerator = _inputEnumerable.enumerator();
......
public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return new Object[] { ... // row columns
};
}
};
}
};
Shawn Weeks <[email protected]> 于2020年1月3日周五 上午12:16写道:
> In my example I'm not setting a partition key as I'm trying to generate
> just a sequential list of integers. It sounds like everything is held in
> memory in a SortedMultiMap which explains the high memory usage though I'm
> not sure why it's using several times the memory as the input record set.
> My test file is a 2gb CSV and I'll hit out of memory with an '-Xmx16g' on
> the process.
>
> My last question was to try and figure out how many times Calcite would
> have to iterate over the data to produce a result as when I give it large
> amounts of memory it does eventually finish it just takes substantially
> longer than the time to iterate through the data once. For example a simple
> select * might take 2-3 minutes for my test file but the row_number version
> takes 30+ minutes if it ever finishes.
>
> Just in case I missed it, Calcite doesn't have a simple counter function
> like Oracles rownum function does it?
>
> Thanks
> Shawn
>
> On 1/1/20, 9:55 PM, "Feng Zhu" <[email protected]> wrote:
>
> Hi, Shawn
>
> I was hoping someone could give me a high level overview of how
> Calcite is
> > executing the window function.
> >
> Calcite runtime generates Java code and complies the code with Janino.
> For
> Window Enumerable Process,
> there is a sample piece of code in
>
> *org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
> which may be what you need.
>
> In general, input rows will be firstly organized into a*
> org.apache.calcite.runtime.SortedMultiMap* according to partitioning
> key,
> and then all of the windowed aggregate functions will be evaluated for
> each
> list of rows that have the same partitioning key.
>
> I would expect it to have to iterate through the data at least once
> for the
> > window and once for the data itself.
> >
> Could you elaborate?
>
> Best,
> Feng
>
> Shawn Weeks <[email protected]> 于2020年1月1日周三 上午1:23写道:
>
> > Hi, I’m trying to troubleshoot an issue with the Apache NiFi
> projects use
> > of Calcite to allow queries against flow files. NiFi presents an
> enumerable
> > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults.
> For
> > queries with simple where clause and aggregations everything works
> great
> > but when I use window analytic functions like row_number against
> larger
> > sets of records(2gb) the processes uses all memory allocated and
> every cpu
> > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in
> live
> > bytes and most of the CPU usage is NiFi parsing the records.
> >
> > I was hoping someone could give me a high level overview of how
> Calcite is
> > executing the window function. I would expect it to have to iterate
> through
> > the data at least once for the window and once for the data itself.
> >
> > Here is the query being executed and a trace of the Calcite
> execution plan.
> >
> > select "antiNucleus",
> > "eventFile",
> > "eventNumber",
> > "eventTime",
> > "histFile",
> > "multiplicity",
> > "NaboveLb",
> > "NbelowLb",
> > "NLb",
> > "primaryTracks",
> > "prodTime",
> > "Pt",
> > "runNumber",
> > "vertexX",
> > "vertexY",
> > "vertexZ",
> > row_number() over() id
> > from flowfile;
> >
> > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Root:
> > rel#10:Subset#1.ENUMERABLE.[]
> > Original rel:
> > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]],
> antiNucleus=[$0],
> > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
> > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
> > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
> > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER
> (ROWS
> > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
> > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
> > FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
> > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
> 12, 13,
> > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0
> cpu, 0.0
> > io}, id = 0
> >
> > Sets:
> > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> > JavaType(class java.lang.String) eventFile, JavaType(class
> > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
> > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
> > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
> > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> > JavaType(class java.lang.String) primaryTracks, JavaType(class
> > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> > JavaType(class java.lang.String) runNumber, JavaType(class
> > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> > JavaType(class java.lang.String) vertexZ)
> > rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
> >
> > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
> > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
> > cost={100.0 rows, 101.0 cpu, 0.0 io}
> > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> > JavaType(class java.lang.String) eventFile, JavaType(class
> > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
> > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
> > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
> > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> > JavaType(class java.lang.String) primaryTracks, JavaType(class
> > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> > JavaType(class java.lang.String) runNumber, JavaType(class
> > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> > JavaType(class java.lang.String) vertexZ, BIGINT id)
> > rel#8:Subset#1.NONE.[], best=null, importance=0.9
> >
> >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)),
> rowcount=100.0,
> > cumulative cost={inf}
> >
> >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
> > rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
> >
> >
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
> > rowcount=100.0, cumulative cost={inf}
> >
> >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
> > 0.0 io}
> >
> >
> > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > EnumerableWindow#16
> > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> > Cheapest plan:
> > EnumerableWindow(window#0=[window(partition {} order by [] rows
> between
> > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount
> =
> > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
> > FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5,
> 6, 7,
> > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost =
> {100.0
> > rows, 101.0 cpu, 0.0 io}, id = 0
> >
> > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> > Provenance:
> > EnumerableWindow#16
> > direct
> >
> >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()]))
> > call#112 rule [EnumerableWindowRule]
> >
> >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()]))
> > call#64 rule [ProjectToWindowRule:project]
> >
> >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
> > no parent
> > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
> > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
> > no parent
> >
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > HepRelVertex#17
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > EnumerableWindow#18
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > HepRelVertex#19
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> > Breadth-first from root: {
> > HepRelVertex#19 =
> >
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
> > 0.0 io}
> > HepRelVertex#17 =
> > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
> > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
> > cost={100.0 rows, 101.0 cpu, 0.0 io}
> >
>
>
>