Hi, sorry to bother again
@Override
public Statistic getStatistic() {
return Statistics.of(100d,
ImmutableList.<ImmutableBitSet>of(),
RelCollations.createSingleton(0));
}
@Override
public TableType getJdbcTableType() {
return TableType.STREAM;
}
I declared rowtime increasing using the above code, and execute the
GROUP BY sql again, it did not throw
Exception, but no result is out.
Another question, if I changed the rowCount to
Double.POSITIVE_INFIITY, I got the following Exception
java.sql.SQLException: Error while executing SQL "SELECT STREAM
FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
GROUP BY FLOOR(rowtime TO HOUR ), ip": Node
[rel#24:Subset#3.ENUMERABLE.[]] could not be implemented; planner
state:
Root: rel#24:Subset#3.ENUMERABLE.[]
Original rel:
Sets:
Set#0, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT
timestamp, VARCHAR(1) url, VARCHAR(1) referrer, VARCHAR(1) useragent,
VARCHAR(1) more)
rel#5:Subset#0.NONE.[0], best=null, importance=0.6561
rel#0:LogicalTableScan.NONE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#38:Subset#0.ENUMERABLE.[0], best=null,
importance=0.49499999999999994
rel#46:EnumerableTableScan.ENUMERABLE.[[0]](table=[KAFKA,
clicks]),
rowcount=1.7976931348623157E308, cumulative cost={Infinity rows,
Infinity cpu, 0.0 io}
rel#53:EnumerableInterpreter.ENUMERABLE.[0](input=rel#45:Subset#0.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#55:EnumerableInterpreter.ENUMERABLE.[[0]](input=rel#45:Subset#0.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#45:Subset#0.BINDABLE.[0], best=null, importance=0.49499999999999994
rel#44:BindableTableScan.BINDABLE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={Infinity rows,
Infinity cpu, 0.0 io}
Set#1, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip)
rel#7:Subset#1.NONE.[0], best=null, importance=0.7290000000000001
rel#6:LogicalProject.NONE.[[0]](input=rel#5:Subset#0.NONE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
rel#40:Subset#1.ENUMERABLE.[0], best=null,
importance=0.49499999999999994
rel#39:EnumerableProject.ENUMERABLE.[[0]](input=rel#38:Subset#0.ENUMERABLE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
Set#2, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT c)
rel#9:Subset#2.NONE.[], best=null, importance=0.81
rel#8:LogicalAggregate.NONE.[](input=rel#7:Subset#1.NONE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
rel#42:Subset#2.ENUMERABLE.[], best=null, importance=0.405
rel#41:EnumerableAggregate.ENUMERABLE.[](input=rel#40:Subset#1.ENUMERABLE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
Set#3, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT c)
rel#11:Subset#3.NONE.[], best=null, importance=0.9
rel#10:LogicalDelta.NONE.[](input=rel#9:Subset#2.NONE.[]),
rowcount=1.7976931348623158E307, cumulative cost={inf}
rel#12:LogicalProject.NONE.[](input=rel#11:Subset#3.NONE.[],rowtime=$0,ip=$1,c=$2),
rowcount=1.7976931348623158E307, cumulative cost={inf}
rel#30:LogicalAggregate.NONE.[](input=rel#29:Subset#5.NONE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
rel#24:Subset#3.ENUMERABLE.[], best=null, importance=1.0
rel#25:AbstractConverter.ENUMERABLE.[](input=rel#11:Subset#3.NONE.[],convention=ENUMERABLE,sort=[]),
rowcount=1.7976931348623158E307, cumulative cost={inf}
rel#26:EnumerableProject.ENUMERABLE.[](input=rel#24:Subset#3.ENUMERABLE.[],rowtime=$0,ip=$1,c=$2),
rowcount=1.7976931348623158E307, cumulative cost={inf}
rel#32:EnumerableAggregate.ENUMERABLE.[](input=rel#31:Subset#5.ENUMERABLE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
Set#5, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip)
rel#29:Subset#5.NONE.[0], best=null, importance=0.81
rel#27:LogicalDelta.NONE.[0](input=rel#7:Subset#1.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#31:Subset#5.ENUMERABLE.[0], best=null, importance=0.9
rel#49:EnumerableProject.ENUMERABLE.[[0]](input=rel#48:Subset#6.ENUMERABLE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
rel#37:Subset#5.NONE.[], best=null, importance=0.45
rel#27:LogicalDelta.NONE.[0](input=rel#7:Subset#1.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#36:LogicalProject.NONE.[](input=rel#35:Subset#6.NONE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
Set#6, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT
timestamp, VARCHAR(1) url, VARCHAR(1) referrer, VARCHAR(1) useragent,
VARCHAR(1) more)
rel#35:Subset#6.NONE.[0], best=null, importance=0.6187499999999999
rel#33:LogicalDelta.NONE.[0](input=rel#5:Subset#0.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#47:LogicalTableScan.NONE.[[0]](table=[KAFKA, clicks,
(STREAM)]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#48:Subset#6.ENUMERABLE.[0], best=null, importance=0.81
rel#56:EnumerableInterpreter.ENUMERABLE.[0](input=rel#52:Subset#6.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#58:EnumerableInterpreter.ENUMERABLE.[[0]](input=rel#52:Subset#6.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
rel#52:Subset#6.BINDABLE.[0], best=null, importance=0.7290000000000001
rel#51:BindableTableScan.BINDABLE.[[0]](table=[KAFKA, clicks,
(STREAM)]), rowcount=1.7976931348623157E308, cumulative cost={Infinity
rows, Infinity cpu, 0.0 io}
at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
at
org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:156)
at
org.apache.calcite.avatica.AvaticaStatement.executeQuery(AvaticaStatement.java:218)
at TestQuery.main(TestQuery.java:43)
2017-02-24 8:53 GMT+08:00 Julian Hyde <[email protected]>:
> You need to declare that the ROWTIME column is monotonic. In
> MockCatalogReader, note how ROWTIME is declared monotonic in the ORDERS and
> SHIPMENTS streams. That’s why some queries in
> SqlValidatorTest.testStreamGroupBy
> are valid and others are not. The key method is
> SqlValidatorTable.getMonotonicity(String
> columnName).
>
> Julian
>
> > On Feb 23, 2017, at 7:23 AM, 陈江枫 <[email protected]> wrote:
> >
> > Hi,all I'm trying to integrate calcite with Kafka, I refrenced
> > CsvStreamableTable.
> >
> > Each ConsumerRecord is convert to Object[] using the fowlloing code:
> >
> > static class ArrayRowConverter extends RowConverter<Object[]> {
> > private List<Schema.Field> fields;
> >
> > public ArrayRowConverter(List<Schema.Field> fields) {
> > this.fields = fields;
> > }
> >
> > @Override
> > Object[] convertRow(ConsumerRecord<String, GenericRecord>
> consumerRecord) {
> > Object[] objects = new Object[fields.size()+1];
> > int i = 0 ;
> > objects[i++] = consumerRecord.timestamp();
> > for(Schema.Field field : this.fields) {
> > Object obj = consumerRecord.value().get(field.name());
> > if( obj instanceof Utf8 ){
> > objects[i ++] = obj.toString();
> > }else {
> > objects[i ++] = obj;
> > }
> > }
> > return objects;
> > }
> > }
> >
> > Enumerator is implemented as following:
> >
> > public E current() {
> > return current;
> > }
> >
> > public boolean moveNext() {
> > for(;;) {
> > if(cancelFlag.get()) {
> > return false;
> > }
> > ConsumerRecord<String, GenericRecord> record = getRecord();
> > if(record == null) {
> > try {
> > Thread.sleep(200L);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> > continue;
> > }
> > current = rowConvert.convertRow(record);
> > return true;
> > }
> > }
> >
> > I tested "SELECT STREAM * FROM Kafka.clicks", it works fine.
> >
> > rowtime is the first column explicitly added,and the value is record
> > Timestamp of Kafka.
> >
> > But when I tried "SELECT STREAM FLOOR(rowtime TO HOUR)
> >
> > AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks GROUP BY FLOOR(rowtime
> > TO HOUR), ip"
> >
> > It threw exception
> >
> > java.sql.SQLException: Error while executing SQL "SELECT STREAM
> > FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
> > GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line
> > 1, column 119: Streaming aggregation requires at least one monotonic
> > expression in GROUP BY clause
> > at org.apache.calcite.avatica.Helper.createException(Helper.
> java:56)
> > at org.apache.calcite.avatica.Helper.createException(Helper.
> java:41)
> >
> >
> > Could someone help?
>
>