The memory issue may be related with the implementation.
For row_number() function, the runtime processing logic is demonstrated as
below.
It can be seen that too many data structures (i.e., list/array/map) will be
created.

Best,
Feng


=====================================================================================================
  *// 1.Loading data set into a List*
  final java.util.List tempList = (java.util.List)
org.apache.calcite.linq4j.Linq4j.asEnumerable(***your data set***).into(new
java.util.ArrayList());

  *// 2.Sorting the data set (Note: a new map&Array&Iterator will be
constructed, see SortedMultiMap)*
  final java.util.Comparator comparator = new java.util.Comparator(){...};
  final java.util.Iterator iterator =
org.apache.calcite.runtime.SortedMultiMap.singletonArrayIterator(comparator,
tempList);

  *// 3.Constructing a new ArrayList for result*
  final java.util.ArrayList _list = new
java.util.ArrayList(tempList.size());

  *// 4. Computing row_number()*
  Long a0w0 = (Long) null; *// for row_number*
  while (iterator.hasNext()) {
    final Object[] _rows = (Object[]) iterator.next();
    for (int i = 0; i < _rows.length; ++i) {
      final org.apache.calcite.test.JdbcTest.Employee row =
(org.apache.calcite.test.JdbcTest.Employee) _rows[i];
      ......
      _list.add(new Object[] {
        ...... // row columns
        a0w0});
    }
  }

  tempList.clear();

  final org.apache.calcite.linq4j.Enumerable _inputEnumerable =
org.apache.calcite.linq4j.Linq4j.asEnumerable(_list);

  return new org.apache.calcite.linq4j.AbstractEnumerable(){
      public org.apache.calcite.linq4j.Enumerator enumerator() {
        return new org.apache.calcite.linq4j.Enumerator(){
            public final org.apache.calcite.linq4j.Enumerator
inputEnumerator = _inputEnumerable.enumerator();
            ......
            public Object current() {
              final Object[] current = (Object[]) inputEnumerator.current();
              return new Object[] { ... // row columns
                };
            }
          };
      }
    };

Shawn Weeks <[email protected]> 于2020年1月3日周五 上午12:16写道:

> In my example I'm not setting a partition key as I'm trying to generate
> just a sequential list of integers. It sounds like everything is held in
> memory in a SortedMultiMap which explains the high memory usage though I'm
> not sure why it's using several times the memory as the input record set.
> My test file is a 2gb CSV and I'll hit out of memory with an '-Xmx16g' on
> the process.
>
> My last question was to try and figure out how many times Calcite would
> have to iterate over the data to produce a result as when I give it large
> amounts of memory it does eventually finish it just takes substantially
> longer than the time to iterate through the data once. For example a simple
> select * might take 2-3 minutes for my test file but the row_number version
> takes 30+ minutes if it ever finishes.
>
> Just in case I missed it, Calcite doesn't have a simple counter function
> like Oracles rownum function does it?
>
> Thanks
> Shawn
>
> On 1/1/20, 9:55 PM, "Feng Zhu" <[email protected]> wrote:
>
>     Hi, Shawn
>
>     I was hoping someone could give me a high level overview of how
> Calcite is
>     > executing the window function.
>     >
>     Calcite runtime generates Java code and complies the code with Janino.
> For
>     Window Enumerable Process,
>     there is a sample piece of code in
>
> *org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
>     which may be what you need.
>
>     In general, input rows will be firstly organized into a*
>     org.apache.calcite.runtime.SortedMultiMap* according to partitioning
> key,
>     and then all of the windowed aggregate functions will be evaluated for
> each
>     list of rows that have the same partitioning key.
>
>     I would expect it to have to iterate through the data at least once
> for the
>     > window and once for the data itself.
>     >
>     Could you elaborate?
>
>     Best,
>     Feng
>
>     Shawn Weeks <[email protected]> 于2020年1月1日周三 上午1:23写道:
>
>     > Hi, I’m trying to troubleshoot an issue with the Apache NiFi
> projects use
>     > of Calcite to allow queries against flow files. NiFi presents an
> enumerable
>     > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults.
> For
>     > queries with simple where clause and aggregations everything works
> great
>     > but when I use window analytic functions like row_number against
> larger
>     > sets of records(2gb) the processes uses all memory allocated and
> every cpu
>     > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in
> live
>     > bytes and most of the CPU usage is NiFi parsing the records.
>     >
>     > I was hoping someone could give me a high level overview of how
> Calcite is
>     > executing the window function. I would expect it to have to iterate
> through
>     > the data at least once for the window and once for the data itself.
>     >
>     > Here is the query being executed and a trace of the Calcite
> execution plan.
>     >
>     > select  "antiNucleus",
>     >         "eventFile",
>     >         "eventNumber",
>     >         "eventTime",
>     >         "histFile",
>     >         "multiplicity",
>     >         "NaboveLb",
>     >         "NbelowLb",
>     >         "NLb",
>     >         "primaryTracks",
>     >         "prodTime",
>     >         "Pt",
>     >         "runNumber",
>     >         "vertexX",
>     >         "vertexY",
>     >         "vertexZ",
>     >         row_number() over() id
>     >     from flowfile;
>     >
>     > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Root:
>     > rel#10:Subset#1.ENUMERABLE.[]
>     > Original rel:
>     > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]],
> antiNucleus=[$0],
>     > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
>     > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
>     > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
>     > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER
> (ROWS
>     > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
>     > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
>     >   FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
>     > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
> 12, 13,
>     > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0
> cpu, 0.0
>     > io}, id = 0
>     >
>     > Sets:
>     > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
>     > JavaType(class java.lang.String) eventFile, JavaType(class
>     > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
>     > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
>     > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
>     > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
>     > JavaType(class java.lang.String) primaryTracks, JavaType(class
>     > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
>     > JavaType(class java.lang.String) runNumber, JavaType(class
>     > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
>     > JavaType(class java.lang.String) vertexZ)
>     >         rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
>     >
>     > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
>     > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
>     > cost={100.0 rows, 101.0 cpu, 0.0 io}
>     > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
>     > JavaType(class java.lang.String) eventFile, JavaType(class
>     > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
>     > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
>     > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
>     > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
>     > JavaType(class java.lang.String) primaryTracks, JavaType(class
>     > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
>     > JavaType(class java.lang.String) runNumber, JavaType(class
>     > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
>     > JavaType(class java.lang.String) vertexZ, BIGINT id)
>     >         rel#8:Subset#1.NONE.[], best=null, importance=0.9
>     >
>     >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
>     > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)),
> rowcount=100.0,
>     > cumulative cost={inf}
>     >
>     >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
>     >         rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
>     >
>     >
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
>     > rowcount=100.0, cumulative cost={inf}
>     >
>     >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
>     > 0.0 io}
>     >
>     >
>     > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > EnumerableWindow#16
>     > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
>     > Cheapest plan:
>     > EnumerableWindow(window#0=[window(partition {} order by [] rows
> between
>     > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount
> =
>     > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
>     >   FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5,
> 6, 7,
>     > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost =
> {100.0
>     > rows, 101.0 cpu, 0.0 io}, id = 0
>     >
>     > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
>     > Provenance:
>     > EnumerableWindow#16
>     >   direct
>     >
>     >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()]))
>     >       call#112 rule [EnumerableWindowRule]
>     >
>     >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()]))
>     >           call#64 rule [ProjectToWindowRule:project]
>     >
>     >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
>     > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
>     >               no parent
>     > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
>     > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
>     >   no parent
>     >
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > HepRelVertex#17
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > EnumerableWindow#18
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > HepRelVertex#19
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
>     > Breadth-first from root:  {
>     >     HepRelVertex#19 =
>     >
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
>     > 0.0 io}
>     >     HepRelVertex#17 =
>     > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
>     > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
>     > cost={100.0 rows, 101.0 cpu, 0.0 io}
>     >
>
>
>

Reply via email to