[ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-8619:
-------------------------------------

Reopen to remove fix version

> Some thing about Flink SQL distinct, I need help
> ------------------------------------------------
>
>                 Key: FLINK-8619
>                 URL: https://issues.apache.org/jira/browse/FLINK-8619
>             Project: Flink
>          Issue Type: Wish
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: Lynch Lee
>            Priority: Major
>             Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--------------+----------++-----------------------+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--------------+----------++-----------------------+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--------------+----------++-----------------------+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flink.table.api.java.StreamTableEnvironment;
>  import org.apache.flink.types.Row;
>  import org.apache.flink.util.Collector;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> import java.nio.charset.Charset;
>  import java.util.Properties;
>  import java.util.UUID;
> public class KafkaConn2Topics1 {
> public static void main(String[] args) throws Exception
> { String inputTopic = "input-case01-test02"; String outputTopic = 
> "output-case01-test02"; Properties props = new Properties(); 
> props.setProperty("bootstrap.servers", "data-node5:9092"); 
> props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", 
> "")); LocalStreamEnvironment streamEnv = 
> StreamExecutionEnvironment.createLocalEnvironment(); 
> streamEnv.setParallelism(1); 
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
> streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); 
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.getTableEnvironment(streamEnv); StreamQueryConfig 
> qConfig = tableEnv.queryConfig(); 
> qConfig.withIdleStateRetentionTime(Time.seconds(0)); 
> streamEnv.getConfig().enableSysoutLogging(); 
> streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>  10000)); RowTypeInfo rowSchema = new RowTypeInfo( new 
> TypeInformation[]\\{Types.STRING(), Types.STRING(), Types.STRING(), 
> Types.STRING()}
> ,
>  new String[]\{"target_type", "target_id", "amt_pay", "down_payment"}
>  );
>  DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, 
> props))
>  .map(new JsonIntoRow(rowSchema))
>  .returns(rowSchema);
> final String _table = "table_" + UUID.randomUUID().toString().replaceAll("-", 
> "");
> tableEnv.registerDataStream(_table, _inDataStream);
> final String _in_fields = " target_id, amt_pay, down_payment";
>  String sql = "select distinct(target_type)," + _in_fields + " from " + 
> _table + " group by target_type";
>  System.out.println(sql);
> Table resultTable = tableEnv.sql(sql);
> DataStream<Row> _outStream =
>  tableEnv.toRetractStream(resultTable, Row.class, qConfig)
>  .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() {
>  @Override
>  public void processElement(Tuple2<Boolean, Row> value, Context ctx, 
> Collector<Row> out) throws Exception {
>  ObjectNode node = new RowIntoJson(rowSchema).run(value.f1);
>  System.out.println("out1 row: " + node.toString());
> if (value.f0)
> { out.collect(value.f1); ObjectNode node1 = new 
> RowIntoJson(rowSchema).run(value.f1); System.out.println("out11 row: " + 
> node1.toString()); }
> }
>  })
>  .map(new MapFunction<Row, Row>() {
>  @Override
>  public Row map(Row value) throws Exception
> { ObjectNode node = new RowIntoJson(rowSchema).run(value); 
> System.out.println("out2 row: " + node.toString()); return value; }
> }).name("result-pickout1-source2")
>  .returns(rowSchema);
>  _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema));
> streamEnv.execute(UUID.randomUUID().toString());
>  }
> private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String 
> outputTopic, Properties props, RowTypeInfo rowSchema)
> { return new FlinkKafkaProducer010<>(outputTopic, new 
> JsonRowSerializationSchema(rowSchema), props); }
> private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String 
> inputTopic, Properties props)
> { return new FlinkKafkaConsumer010<>(inputTopic, new 
> MyJson2ObjectNodeDeser(), props); }
> public static class MyJson2ObjectNodeDeser extends 
> AbstractDeserializationSchema<ObjectNode> {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class);
> private static ObjectMapper mapper = new ObjectMapper();
> static
> { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); 
> mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); 
> mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
> true); }
> @Override
>  public ObjectNode deserialize(byte[] message) {
>  if (mapper == null)
> { mapper = new ObjectMapper(); }
> try {
>  ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class);
>  LOGGER.info("source data:{}", jsonNode);
> if (!jsonNode.has("record")) {
>  LOGGER.warn("not required section[record] found, pass, received:{}", 
> jsonNode);
>  return JsonNodeFactory.instance.objectNode();
>  }
> JsonNode record = jsonNode.get("record");
>  if (!record.isObject()) {
>  LOGGER.warn("value of section[record] should be Object. pls check your 
> input:{}", jsonNode);
>  return JsonNodeFactory.instance.objectNode();
>  }
> LOGGER.info("record data:{}", record);
>  System.out.println("record data: " + record.toString());
>  return (ObjectNode) record;
>  } catch (Exception e) {
>  LOGGER.warn("ETL clean up fail for source stream data, pls check your data 
> schema. fail over. data received: {}", new String(message, 
> Charset.forName("UTF-8")), e);
>  }
> return null;
>  }
> @Override
>  public boolean isEndOfStream(ObjectNode nextElement)
> { return false; }
> }
> }
>  
> I got an error while I fire the job 
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 31 to line 1, column 39: 
> Expression 'target_id' is not being grouped
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
>  at 
> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
>  at 
> org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535)
>  at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86)
>  Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 31 to line 1, column 39: Expression 'target_id' is not being grouped
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
>  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654)
>  at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117)
>  at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41)
>  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
>  at 
> org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231)
>  at 
> org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926)
>  at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611)
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
>  ... 3 more
>  Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression 
> 'target_id' is not being grouped
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>  at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>  ... 22 more
>  
> I need help here:
> 1.  could someone point my missing?
> 2 . if i want got the result like the mysql usage below , what should i 
> change my code?
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5           |1         |       1|            1|
> |6           |2         |       1|            1|
> |7           |3         |       1|            1|
> +--------------+----------++-----------------------+
> 3 rows in set (0.00 sec)
>  
> many thanks ~~
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to