You need to declare that the ROWTIME column is monotonic. In MockCatalogReader, 
note how ROWTIME is declared monotonic in the ORDERS and SHIPMENTS streams. 
That’s why some queries in SqlValidatorTest.testStreamGroupBy are valid and 
others are not. The key method is SqlValidatorTable.getMonotonicity(String 
columnName).

Julian

> On Feb 23, 2017, at 7:23 AM, 陈江枫 <[email protected]> wrote:
> 
> Hi,all I'm trying to integrate calcite with Kafka, I refrenced
> CsvStreamableTable.
> 
> Each ConsumerRecord is convert to Object[] using the fowlloing code:
> 
> static class ArrayRowConverter extends RowConverter<Object[]> {
>    private List<Schema.Field> fields;
> 
>    public ArrayRowConverter(List<Schema.Field> fields) {
>        this.fields = fields;
>    }
> 
>    @Override
>    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
>        Object[] objects = new Object[fields.size()+1];
>        int i = 0 ;
>        objects[i++] = consumerRecord.timestamp();
>        for(Schema.Field field : this.fields) {
>            Object obj = consumerRecord.value().get(field.name());
>            if( obj instanceof Utf8 ){
>                objects[i ++] = obj.toString();
>            }else {
>                objects[i ++] = obj;
>            }
>        }
>        return objects;
>    }
> }
> 
> Enumerator is implemented as following:
> 
> public E current() {
>    return current;
> }
> 
> public boolean moveNext() {
>    for(;;) {
>        if(cancelFlag.get()) {
>            return false;
>        }
>        ConsumerRecord<String, GenericRecord> record = getRecord();
>        if(record ==  null) {
>            try {
>                Thread.sleep(200L);
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            }
>            continue;
>        }
>        current = rowConvert.convertRow(record);
>        return true;
>    }
> }
> 
> I tested "SELECT STREAM * FROM Kafka.clicks", it works fine.
> 
> rowtime is the first column explicitly added,and the value is record
> Timestamp of Kafka.
> 
> But when I tried "SELECT STREAM FLOOR(rowtime TO HOUR)
> 
> AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime
> TO HOUR), ip"
> 
> It threw exception
> 
> java.sql.SQLException: Error while executing SQL "SELECT STREAM
> FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
> GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line
> 1, column 119: Streaming aggregation requires at least one monotonic
> expression in GROUP BY clause
>       at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
>       at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
> 
> 
> Could someone help?

Reply via email to