[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timo Walther closed FLINK-8619. ------------------------------- Resolution: Won't Fix Hi [~lynchlee], Jira is used for tracking bugs or proposing new features. If you have questions like this, please use our mailing lists (http://flink.apache.org/community.html#mailing-lists) and we are happy to help you. > Some thing about Flink SQL distinct, I need help > ------------------------------------------------ > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Lynch Lee > Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--------------+----------++-----------------------+ > |target_type|target_id|amt_pay|down_payment| > +--------------+----------++-----------------------+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--------------+----------++-----------------------+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--------------+----------++-----------------------+ > |target_type|target_id|amt_pay|down_payment| > +--------------+----------++-----------------------+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--------------+----------++-----------------------+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--------------+----------++-----------------------+ > |target_type|target_id|amt_pay|down_payment| > +--------------+----------++-----------------------+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--------------+----------++-----------------------+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import java.nio.charset.Charset; > import java.util.Properties; > import java.util.UUID; > public class KafkaConn2Topics1 { > public static void main(String[] args) throws Exception > { String inputTopic = "input-case01-test02"; String outputTopic = > "output-case01-test02"; Properties props = new Properties(); > props.setProperty("bootstrap.servers", "data-node5:9092"); > props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", > "")); LocalStreamEnvironment streamEnv = > StreamExecutionEnvironment.createLocalEnvironment(); > streamEnv.setParallelism(1); > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.getTableEnvironment(streamEnv); StreamQueryConfig > qConfig = tableEnv.queryConfig(); > qConfig.withIdleStateRetentionTime(Time.seconds(0)); > streamEnv.getConfig().enableSysoutLogging(); > streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 10000)); RowTypeInfo rowSchema = new RowTypeInfo( new > TypeInformation[]\\{Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING()} > , > new String[]\{"target_type", "target_id", "amt_pay", "down_payment"} > ); > DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, > props)) > .map(new JsonIntoRow(rowSchema)) > .returns(rowSchema); > final String _table = "table_" + UUID.randomUUID().toString().replaceAll("-", > ""); > tableEnv.registerDataStream(_table, _inDataStream); > final String _in_fields = " target_id, amt_pay, down_payment"; > String sql = "select distinct(target_type)," + _in_fields + " from " + > _table + " group by target_type"; > System.out.println(sql); > Table resultTable = tableEnv.sql(sql); > DataStream<Row> _outStream = > tableEnv.toRetractStream(resultTable, Row.class, qConfig) > .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() { > @Override > public void processElement(Tuple2<Boolean, Row> value, Context ctx, > Collector<Row> out) throws Exception { > ObjectNode node = new RowIntoJson(rowSchema).run(value.f1); > System.out.println("out1 row: " + node.toString()); > if (value.f0) > { out.collect(value.f1); ObjectNode node1 = new > RowIntoJson(rowSchema).run(value.f1); System.out.println("out11 row: " + > node1.toString()); } > } > }) > .map(new MapFunction<Row, Row>() { > @Override > public Row map(Row value) throws Exception > { ObjectNode node = new RowIntoJson(rowSchema).run(value); > System.out.println("out2 row: " + node.toString()); return value; } > }).name("result-pickout1-source2") > .returns(rowSchema); > _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema)); > streamEnv.execute(UUID.randomUUID().toString()); > } > private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String > outputTopic, Properties props, RowTypeInfo rowSchema) > { return new FlinkKafkaProducer010<>(outputTopic, new > JsonRowSerializationSchema(rowSchema), props); } > private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String > inputTopic, Properties props) > { return new FlinkKafkaConsumer010<>(inputTopic, new > MyJson2ObjectNodeDeser(), props); } > public static class MyJson2ObjectNodeDeser extends > AbstractDeserializationSchema<ObjectNode> { > private static final Logger LOGGER = > LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class); > private static ObjectMapper mapper = new ObjectMapper(); > static > { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); > mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); > mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, > true); } > @Override > public ObjectNode deserialize(byte[] message) { > if (mapper == null) > { mapper = new ObjectMapper(); } > try { > ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class); > LOGGER.info("source data:{}", jsonNode); > if (!jsonNode.has("record")) { > LOGGER.warn("not required section[record] found, pass, received:{}", > jsonNode); > return JsonNodeFactory.instance.objectNode(); > } > JsonNode record = jsonNode.get("record"); > if (!record.isObject()) { > LOGGER.warn("value of section[record] should be Object. pls check your > input:{}", jsonNode); > return JsonNodeFactory.instance.objectNode(); > } > LOGGER.info("record data:{}", record); > System.out.println("record data: " + record.toString()); > return (ObjectNode) record; > } catch (Exception e) { > LOGGER.warn("ETL clean up fail for source stream data, pls check your data > schema. fail over. data received: {}", new String(message, > Charset.forName("UTF-8")), e); > } > return null; > } > @Override > public boolean isEndOfStream(ObjectNode nextElement) > { return false; } > } > } > > I got an error while I fire the job > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 31 to line 1, column 39: > Expression 'target_id' is not being grouped > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) > at > org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) > at > org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535) > at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 31 to line 1, column 39: Expression 'target_id' is not being grouped > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654) > at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117) > at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344) > at > org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231) > at > org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89) > ... 3 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression > 'target_id' is not being grouped > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 22 more > > I need help here: > 1. could someone point my missing? > 2 . if i want got the result like the mysql usage below , what should i > change my code? > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--------------+----------++-----------------------+ > |target_type|target_id|amt_pay|down_payment| > +--------------+----------++-----------------------+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--------------+----------++-----------------------+ > 3 rows in set (0.00 sec) > > many thanks ~~ > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)