[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lynch Lee updated FLINK-8619: ----------------------------- Description: I do some test about distinct on mysql below: mysql> CREATE TABLE `rpt_tt` ( -> `target_id` varchar(50) NOT NULL DEFAULT '', -> `target_type` varchar(50) NOT NULL DEFAULT '', -> `amt_pay` bigint(20) DEFAULT NULL, -> `down_payment` bigint(20) DEFAULT NULL, -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; Query OK, 0 rows affected (0.01 sec) mysql> insert into rpt_tt values("1","5","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("3","5","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("2","6","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("3","7","1","1"); Query OK, 1 row affected (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt; +-------------+-----------+---------+--------------+ | target_type | target_id | amt_pay | down_payment | +-------------+-----------+---------+--------------+ | 5 | 1 | 1 | 1 | | 6 | 2 | 1 | 1 | | 5 | 3 | 1 | 1 | | 7 | 3 | 1 | 1 | +-------------+-----------+---------+--------------+ 4 rows in set (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type; +-------------+-----------+---------+--------------+ | target_type | target_id | amt_pay | down_payment | +-------------+-----------+---------+--------------+ | 5 | 1 | 1 | 1 | | 6 | 2 | 1 | 1 | | 7 | 3 | 1 | 1 | +-------------+-----------+---------+--------------+ 3 rows in set (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type,target_id,amt_pay,down_payment; +-------------+-----------+---------+--------------+ | target_type | target_id | amt_pay | down_payment | +-------------+-----------+---------+--------------+ | 5 | 1 | 1 | 1 | | 5 | 3 | 1 | 1 | | 6 | 2 | 1 | 1 | | 7 | 3 | 1 | 1 | +-------------+-----------+---------+--------------+ 4 rows in set (0.01 sec) But now, I want do some query on flink SQL, code is here: import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.god.hala.flink.convertors.RowIntoJson; import com.god.hala.flink.sources.DataSources; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.Properties; import java.util.UUID; public class KafkaConn2Topics1 { public static void main(String[] args) throws Exception { String inputTopic = "input-case01-test02"; String outputTopic = "output-case01-test02"; Properties props = new Properties(); props.setProperty("bootstrap.servers", "data-node5:9092"); props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", "")); LocalStreamEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(); streamEnv.setParallelism(1); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(streamEnv); StreamQueryConfig qConfig = tableEnv.queryConfig(); qConfig.withIdleStateRetentionTime(Time.seconds(0)); streamEnv.getConfig().enableSysoutLogging(); streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); RowTypeInfo rowSchema = new RowTypeInfo( new TypeInformation[]\{Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()}, new String[]\{"target_type", "target_id", "amt_pay", "down_payment"} ); DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, props)) .map(new JsonIntoRow(rowSchema)) .returns(rowSchema); final String _table = "table_" + UUID.randomUUID().toString().replaceAll("-", ""); tableEnv.registerDataStream(_table, _inDataStream); final String _in_fields = " target_id, amt_pay, down_payment"; String sql = "select distinct(target_type)," + _in_fields + " from " + _table + " group by target_type"; System.out.println(sql); Table resultTable = tableEnv.sql(sql); DataStream<Row> _outStream = tableEnv.toRetractStream(resultTable, Row.class, qConfig) .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() { @Override public void processElement(Tuple2<Boolean, Row> value, Context ctx, Collector<Row> out) throws Exception { ObjectNode node = new RowIntoJson(rowSchema).run(value.f1); System.out.println("out1 row: " + node.toString()); if (value.f0) { out.collect(value.f1); ObjectNode node1 = new RowIntoJson(rowSchema).run(value.f1); System.out.println("out11 row: " + node1.toString()); } } }) .map(new MapFunction<Row, Row>() { @Override public Row map(Row value) throws Exception { ObjectNode node = new RowIntoJson(rowSchema).run(value); System.out.println("out2 row: " + node.toString()); return value; } }).name("result-pickout1-source2") .returns(rowSchema); _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema)); streamEnv.execute(UUID.randomUUID().toString()); } private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String outputTopic, Properties props, RowTypeInfo rowSchema) { return new FlinkKafkaProducer010<>(outputTopic, new JsonRowSerializationSchema(rowSchema), props); } private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String inputTopic, Properties props) { return new FlinkKafkaConsumer010<>(inputTopic, new MyJson2ObjectNodeDeser(), props); } public static class MyJson2ObjectNodeDeser extends AbstractDeserializationSchema<ObjectNode> { private static final Logger LOGGER = LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class); private static ObjectMapper mapper = new ObjectMapper(); static { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true); } @Override public ObjectNode deserialize(byte[] message) { if (mapper == null) { mapper = new ObjectMapper(); } try { ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class); LOGGER.info("source data:{}", jsonNode); if (!jsonNode.has("record")) { LOGGER.warn("not required section[record] found, pass, received:{}", jsonNode); return JsonNodeFactory.instance.objectNode(); } JsonNode record = jsonNode.get("record"); if (!record.isObject()) { LOGGER.warn("value of section[record] should be Object. pls check your input:{}", jsonNode); return JsonNodeFactory.instance.objectNode(); } LOGGER.info("record data:{}", record); System.out.println("record data: " + record.toString()); return (ObjectNode) record; } catch (Exception e) { LOGGER.warn("ETL clean up fail for source stream data, pls check your data schema. fail over. data received: {}", new String(message, Charset.forName("UTF-8")), e); } return null; } @Override public boolean isEndOfStream(ObjectNode nextElement) { return false; } } } I got an error while I fire the job Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 31 to line 1, column 39: Expression 'target_id' is not being grouped at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535) at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 31 to line 1, column 39: Expression 'target_id' is not being grouped at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654) at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117) at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344) at org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231) at org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89) ... 3 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression 'target_id' is not being grouped at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 22 more I need help here: 1. could someone point my missing? 2 . if i want got the result like the mysql usage below , what do i change my code? mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type; +-------------+-----------+---------+--------------+ | target_type | target_id | amt_pay | down_payment | +-------------+-----------+---------+--------------+ | 5 | 1 | 1 | 1 | | 6 | 2 | 1 | 1 | | 7 | 3 | 1 | 1 | +-------------+-----------+---------+--------------+ 3 rows in set (0.00 sec) many thanks ~~ > Some thing about Flink SQL distinct, I need help > ------------------------------------------------ > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Lynch Lee > Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +-------------+-----------+---------+--------------+ > | target_type | target_id | amt_pay | down_payment | > +-------------+-----------+---------+--------------+ > | 5 | 1 | 1 | 1 | > | 6 | 2 | 1 | 1 | > | 5 | 3 | 1 | 1 | > | 7 | 3 | 1 | 1 | > +-------------+-----------+---------+--------------+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +-------------+-----------+---------+--------------+ > | target_type | target_id | amt_pay | down_payment | > +-------------+-----------+---------+--------------+ > | 5 | 1 | 1 | 1 | > | 6 | 2 | 1 | 1 | > | 7 | 3 | 1 | 1 | > +-------------+-----------+---------+--------------+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +-------------+-----------+---------+--------------+ > | target_type | target_id | amt_pay | down_payment | > +-------------+-----------+---------+--------------+ > | 5 | 1 | 1 | 1 | > | 5 | 3 | 1 | 1 | > | 6 | 2 | 1 | 1 | > | 7 | 3 | 1 | 1 | > +-------------+-----------+---------+--------------+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import java.nio.charset.Charset; > import java.util.Properties; > import java.util.UUID; > public class KafkaConn2Topics1 { > public static void main(String[] args) throws Exception { > String inputTopic = "input-case01-test02"; > String outputTopic = "output-case01-test02"; > Properties props = new Properties(); > props.setProperty("bootstrap.servers", "data-node5:9092"); > props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", > "")); > LocalStreamEnvironment streamEnv = > StreamExecutionEnvironment.createLocalEnvironment(); > streamEnv.setParallelism(1); > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.getTableEnvironment(streamEnv); > StreamQueryConfig qConfig = tableEnv.queryConfig(); > qConfig.withIdleStateRetentionTime(Time.seconds(0)); > streamEnv.getConfig().enableSysoutLogging(); > > streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 10000)); > RowTypeInfo rowSchema = new RowTypeInfo( > new TypeInformation[]\{Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING()}, > new String[]\{"target_type", "target_id", "amt_pay", "down_payment"} > ); > DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, > props)) > .map(new JsonIntoRow(rowSchema)) > .returns(rowSchema); > final String _table = "table_" + > UUID.randomUUID().toString().replaceAll("-", ""); > tableEnv.registerDataStream(_table, _inDataStream); > final String _in_fields = " target_id, amt_pay, down_payment"; > String sql = "select distinct(target_type)," + _in_fields + " from " + > _table + " group by target_type"; > System.out.println(sql); > Table resultTable = tableEnv.sql(sql); > DataStream<Row> _outStream = > tableEnv.toRetractStream(resultTable, Row.class, qConfig) > .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() { > @Override > public void processElement(Tuple2<Boolean, Row> value, Context ctx, > Collector<Row> out) throws Exception { > ObjectNode node = new RowIntoJson(rowSchema).run(value.f1); > System.out.println("out1 row: " + node.toString()); > if (value.f0) { > out.collect(value.f1); > ObjectNode node1 = new RowIntoJson(rowSchema).run(value.f1); > System.out.println("out11 row: " + node1.toString()); > } > } > }) > .map(new MapFunction<Row, Row>() { > @Override > public Row map(Row value) throws Exception { > ObjectNode node = new RowIntoJson(rowSchema).run(value); > System.out.println("out2 row: " + node.toString()); > return value; > } > }).name("result-pickout1-source2") > .returns(rowSchema); > _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema)); > streamEnv.execute(UUID.randomUUID().toString()); > } > private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String > outputTopic, Properties props, RowTypeInfo rowSchema) { > return new FlinkKafkaProducer010<>(outputTopic, new > JsonRowSerializationSchema(rowSchema), props); > } > private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String > inputTopic, Properties props) { > return new FlinkKafkaConsumer010<>(inputTopic, new MyJson2ObjectNodeDeser(), > props); > } > public static class MyJson2ObjectNodeDeser extends > AbstractDeserializationSchema<ObjectNode> { > private static final Logger LOGGER = > LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class); > private static ObjectMapper mapper = new ObjectMapper(); > static { > mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); > mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); > mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, > true); > } > @Override > public ObjectNode deserialize(byte[] message) { > if (mapper == null) { > mapper = new ObjectMapper(); > } > try { > ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class); > LOGGER.info("source data:{}", jsonNode); > if (!jsonNode.has("record")) { > LOGGER.warn("not required section[record] found, pass, received:{}", > jsonNode); > return JsonNodeFactory.instance.objectNode(); > } > JsonNode record = jsonNode.get("record"); > if (!record.isObject()) { > LOGGER.warn("value of section[record] should be Object. pls check your > input:{}", jsonNode); > return JsonNodeFactory.instance.objectNode(); > } > LOGGER.info("record data:{}", record); > System.out.println("record data: " + record.toString()); > return (ObjectNode) record; > } catch (Exception e) { > LOGGER.warn("ETL clean up fail for source stream data, pls check your data > schema. fail over. data received: {}", new String(message, > Charset.forName("UTF-8")), e); > } > return null; > } > @Override > public boolean isEndOfStream(ObjectNode nextElement) { > return false; > } > } > } > > I got an error while I fire the job > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 31 to line 1, column 39: > Expression 'target_id' is not being grouped > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) > at > org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) > at > org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535) > at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 31 to line 1, column 39: Expression 'target_id' is not being grouped > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654) > at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117) > at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344) > at > org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231) > at > org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89) > ... 3 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression > 'target_id' is not being grouped > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 22 more > > I need help here: > 1. could someone point my missing? > 2 . if i want got the result like the mysql usage below , what do i change my > code? > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +-------------+-----------+---------+--------------+ > | target_type | target_id | amt_pay | down_payment | > +-------------+-----------+---------+--------------+ > | 5 | 1 | 1 | 1 | > | 6 | 2 | 1 | 1 | > | 7 | 3 | 1 | 1 | > +-------------+-----------+---------+--------------+ > 3 rows in set (0.00 sec) > > many thanks ~~ > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)