Hi, sorry to bother again

 @Override
    public Statistic getStatistic() {
        return Statistics.of(100d,
                ImmutableList.<ImmutableBitSet>of(),
                RelCollations.createSingleton(0));
    }

    @Override
    public TableType getJdbcTableType() {
        return TableType.STREAM;
    }

I declared rowtime increasing using the above code, and execute the
GROUP BY sql again, it did not throw

Exception, but no result is out.


Another question, if  I changed the rowCount to
Double.POSITIVE_INFIITY, I got the following Exception

java.sql.SQLException: Error while executing SQL "SELECT STREAM
FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
GROUP BY FLOOR(rowtime TO HOUR ), ip": Node
[rel#24:Subset#3.ENUMERABLE.[]] could not be implemented; planner
state:

Root: rel#24:Subset#3.ENUMERABLE.[]
Original rel:

Sets:
Set#0, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT
timestamp, VARCHAR(1) url, VARCHAR(1) referrer, VARCHAR(1) useragent,
VARCHAR(1) more)
        rel#5:Subset#0.NONE.[0], best=null, importance=0.6561
                rel#0:LogicalTableScan.NONE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
        rel#38:Subset#0.ENUMERABLE.[0], best=null, 
importance=0.49499999999999994
                rel#46:EnumerableTableScan.ENUMERABLE.[[0]](table=[KAFKA, 
clicks]),
rowcount=1.7976931348623157E308, cumulative cost={Infinity rows,
Infinity cpu, 0.0 io}
                
rel#53:EnumerableInterpreter.ENUMERABLE.[0](input=rel#45:Subset#0.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
                
rel#55:EnumerableInterpreter.ENUMERABLE.[[0]](input=rel#45:Subset#0.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
        rel#45:Subset#0.BINDABLE.[0], best=null, importance=0.49499999999999994
                rel#44:BindableTableScan.BINDABLE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={Infinity rows,
Infinity cpu, 0.0 io}
Set#1, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip)
        rel#7:Subset#1.NONE.[0], best=null, importance=0.7290000000000001
                
rel#6:LogicalProject.NONE.[[0]](input=rel#5:Subset#0.NONE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
        rel#40:Subset#1.ENUMERABLE.[0], best=null, 
importance=0.49499999999999994
                
rel#39:EnumerableProject.ENUMERABLE.[[0]](input=rel#38:Subset#0.ENUMERABLE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
Set#2, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT c)
        rel#9:Subset#2.NONE.[], best=null, importance=0.81
                
rel#8:LogicalAggregate.NONE.[](input=rel#7:Subset#1.NONE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
        rel#42:Subset#2.ENUMERABLE.[], best=null, importance=0.405
                
rel#41:EnumerableAggregate.ENUMERABLE.[](input=rel#40:Subset#1.ENUMERABLE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
Set#3, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT c)
        rel#11:Subset#3.NONE.[], best=null, importance=0.9
                rel#10:LogicalDelta.NONE.[](input=rel#9:Subset#2.NONE.[]),
rowcount=1.7976931348623158E307, cumulative cost={inf}
                
rel#12:LogicalProject.NONE.[](input=rel#11:Subset#3.NONE.[],rowtime=$0,ip=$1,c=$2),
rowcount=1.7976931348623158E307, cumulative cost={inf}
                
rel#30:LogicalAggregate.NONE.[](input=rel#29:Subset#5.NONE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
        rel#24:Subset#3.ENUMERABLE.[], best=null, importance=1.0
                
rel#25:AbstractConverter.ENUMERABLE.[](input=rel#11:Subset#3.NONE.[],convention=ENUMERABLE,sort=[]),
rowcount=1.7976931348623158E307, cumulative cost={inf}
                
rel#26:EnumerableProject.ENUMERABLE.[](input=rel#24:Subset#3.ENUMERABLE.[],rowtime=$0,ip=$1,c=$2),
rowcount=1.7976931348623158E307, cumulative cost={inf}
                
rel#32:EnumerableAggregate.ENUMERABLE.[](input=rel#31:Subset#5.ENUMERABLE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
Set#5, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip)
        rel#29:Subset#5.NONE.[0], best=null, importance=0.81
                rel#27:LogicalDelta.NONE.[0](input=rel#7:Subset#1.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
        rel#31:Subset#5.ENUMERABLE.[0], best=null, importance=0.9
                
rel#49:EnumerableProject.ENUMERABLE.[[0]](input=rel#48:Subset#6.ENUMERABLE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
        rel#37:Subset#5.NONE.[], best=null, importance=0.45
                rel#27:LogicalDelta.NONE.[0](input=rel#7:Subset#1.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
                
rel#36:LogicalProject.NONE.[](input=rel#35:Subset#6.NONE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
Set#6, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT
timestamp, VARCHAR(1) url, VARCHAR(1) referrer, VARCHAR(1) useragent,
VARCHAR(1) more)
        rel#35:Subset#6.NONE.[0], best=null, importance=0.6187499999999999
                rel#33:LogicalDelta.NONE.[0](input=rel#5:Subset#0.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
                rel#47:LogicalTableScan.NONE.[[0]](table=[KAFKA, clicks, 
(STREAM)]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
        rel#48:Subset#6.ENUMERABLE.[0], best=null, importance=0.81
                
rel#56:EnumerableInterpreter.ENUMERABLE.[0](input=rel#52:Subset#6.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
                
rel#58:EnumerableInterpreter.ENUMERABLE.[[0]](input=rel#52:Subset#6.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
        rel#52:Subset#6.BINDABLE.[0], best=null, importance=0.7290000000000001
                rel#51:BindableTableScan.BINDABLE.[[0]](table=[KAFKA, clicks,
(STREAM)]), rowcount=1.7976931348623157E308, cumulative cost={Infinity
rows, Infinity cpu, 0.0 io}


        at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
        at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
        at 
org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:156)
        at 
org.apache.calcite.avatica.AvaticaStatement.executeQuery(AvaticaStatement.java:218)
        at TestQuery.main(TestQuery.java:43)


2017-02-24 8:53 GMT+08:00 Julian Hyde <[email protected]>:

> You need to declare that the ROWTIME column is monotonic. In
> MockCatalogReader, note how ROWTIME is declared monotonic in the ORDERS and
> SHIPMENTS streams. That’s why some queries in 
> SqlValidatorTest.testStreamGroupBy
> are valid and others are not. The key method is 
> SqlValidatorTable.getMonotonicity(String
> columnName).
>
> Julian
>
> > On Feb 23, 2017, at 7:23 AM, 陈江枫 <[email protected]> wrote:
> >
> > Hi,all I'm trying to integrate calcite with Kafka, I refrenced
> > CsvStreamableTable.
> >
> > Each ConsumerRecord is convert to Object[] using the fowlloing code:
> >
> > static class ArrayRowConverter extends RowConverter<Object[]> {
> >    private List<Schema.Field> fields;
> >
> >    public ArrayRowConverter(List<Schema.Field> fields) {
> >        this.fields = fields;
> >    }
> >
> >    @Override
> >    Object[] convertRow(ConsumerRecord<String, GenericRecord>
> consumerRecord) {
> >        Object[] objects = new Object[fields.size()+1];
> >        int i = 0 ;
> >        objects[i++] = consumerRecord.timestamp();
> >        for(Schema.Field field : this.fields) {
> >            Object obj = consumerRecord.value().get(field.name());
> >            if( obj instanceof Utf8 ){
> >                objects[i ++] = obj.toString();
> >            }else {
> >                objects[i ++] = obj;
> >            }
> >        }
> >        return objects;
> >    }
> > }
> >
> > Enumerator is implemented as following:
> >
> > public E current() {
> >    return current;
> > }
> >
> > public boolean moveNext() {
> >    for(;;) {
> >        if(cancelFlag.get()) {
> >            return false;
> >        }
> >        ConsumerRecord<String, GenericRecord> record = getRecord();
> >        if(record ==  null) {
> >            try {
> >                Thread.sleep(200L);
> >            } catch (InterruptedException e) {
> >                e.printStackTrace();
> >            }
> >            continue;
> >        }
> >        current = rowConvert.convertRow(record);
> >        return true;
> >    }
> > }
> >
> > I tested "SELECT STREAM * FROM Kafka.clicks", it works fine.
> >
> > rowtime is the first column explicitly added,and the value is record
> > Timestamp of Kafka.
> >
> > But when I tried "SELECT STREAM FLOOR(rowtime TO HOUR)
> >
> > AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime
> > TO HOUR), ip"
> >
> > It threw exception
> >
> > java.sql.SQLException: Error while executing SQL "SELECT STREAM
> > FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
> > GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line
> > 1, column 119: Streaming aggregation requires at least one monotonic
> > expression in GROUP BY clause
> >       at org.apache.calcite.avatica.Helper.createException(Helper.
> java:56)
> >       at org.apache.calcite.avatica.Helper.createException(Helper.
> java:41)
> >
> >
> > Could someone help?
>
>

Reply via email to