[ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8619:
-----------------------------
    Description: 
I do some test about distinct on mysql below:

 

 

mysql> CREATE TABLE `rpt_tt` (

    ->   `target_id` varchar(50) NOT NULL DEFAULT '',

    ->   `target_type` varchar(50) NOT NULL DEFAULT '',

    ->   `amt_pay` bigint(20) DEFAULT NULL,

    ->   `down_payment` bigint(20) DEFAULT NULL,

    ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)

    ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Query OK, 0 rows affected (0.01 sec)

 

mysql> insert into rpt_tt values("1","5","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("3","5","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("2","6","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("3","7","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt;

+-------------+-----------+---------+--------------+

| target_type | target_id | amt_pay | down_payment |

+-------------+-----------+---------+--------------+

| 5           | 1         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 5           | 3         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-------------+-----------+---------+--------------+

4 rows in set (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type;

+-------------+-----------+---------+--------------+

| target_type | target_id | amt_pay | down_payment |

+-------------+-----------+---------+--------------+

| 5           | 1         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-------------+-----------+---------+--------------+

3 rows in set (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type,target_id,amt_pay,down_payment;

+-------------+-----------+---------+--------------+

| target_type | target_id | amt_pay | down_payment |

+-------------+-----------+---------+--------------+

| 5           | 1         |       1 |            1 |

| 5           | 3         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-------------+-----------+---------+--------------+

4 rows in set (0.01 sec)

 

But now,

I want do some query on flink SQL, code is here:


import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.god.hala.flink.convertors.RowIntoJson;
import com.god.hala.flink.sources.DataSources;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.UUID;

public class KafkaConn2Topics1 {

 public static void main(String[] args) throws Exception {
 String inputTopic = "input-case01-test02";
 String outputTopic = "output-case01-test02";

 Properties props = new Properties();
 props.setProperty("bootstrap.servers", "data-node5:9092");
 props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", 
""));

 LocalStreamEnvironment streamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
 streamEnv.setParallelism(1);
 streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

 StreamTableEnvironment tableEnv = 
StreamTableEnvironment.getTableEnvironment(streamEnv);

 StreamQueryConfig qConfig = tableEnv.queryConfig();
 qConfig.withIdleStateRetentionTime(Time.seconds(0));

 streamEnv.getConfig().enableSysoutLogging();

 
streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));

 RowTypeInfo rowSchema = new RowTypeInfo(
 new TypeInformation[]\{Types.STRING(), Types.STRING(), Types.STRING(), 
Types.STRING()},
 new String[]\{"target_type", "target_id", "amt_pay", "down_payment"}
 );
 DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, 
props))
 .map(new JsonIntoRow(rowSchema))
 .returns(rowSchema);

 final String _table = "table_" + UUID.randomUUID().toString().replaceAll("-", 
"");

 tableEnv.registerDataStream(_table, _inDataStream);

 final String _in_fields = " target_id, amt_pay, down_payment";
 String sql = "select distinct(target_type)," + _in_fields + " from " + _table 
+ " group by target_type";
 System.out.println(sql);

 Table resultTable = tableEnv.sql(sql);

 DataStream<Row> _outStream =
 tableEnv.toRetractStream(resultTable, Row.class, qConfig)
 .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() {
 @Override
 public void processElement(Tuple2<Boolean, Row> value, Context ctx, 
Collector<Row> out) throws Exception {
 ObjectNode node = new RowIntoJson(rowSchema).run(value.f1);
 System.out.println("out1 row: " + node.toString());

 if (value.f0) {
 out.collect(value.f1);
 ObjectNode node1 = new RowIntoJson(rowSchema).run(value.f1);
 System.out.println("out11 row: " + node1.toString());
 }
 }
 })
 .map(new MapFunction<Row, Row>() {
 @Override
 public Row map(Row value) throws Exception {
 ObjectNode node = new RowIntoJson(rowSchema).run(value);
 System.out.println("out2 row: " + node.toString());

 return value;
 }
 }).name("result-pickout1-source2")
 .returns(rowSchema);
 _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema));

 streamEnv.execute(UUID.randomUUID().toString());
 }

 private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String 
outputTopic, Properties props, RowTypeInfo rowSchema) {
 return new FlinkKafkaProducer010<>(outputTopic, new 
JsonRowSerializationSchema(rowSchema), props);
 }

 private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String 
inputTopic, Properties props) {
 return new FlinkKafkaConsumer010<>(inputTopic, new MyJson2ObjectNodeDeser(), 
props);
 }


 public static class MyJson2ObjectNodeDeser extends 
AbstractDeserializationSchema<ObjectNode> {

 private static final Logger LOGGER = 
LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class);

 private static ObjectMapper mapper = new ObjectMapper();

 static {
 mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
 mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
true);
 }

 @Override
 public ObjectNode deserialize(byte[] message) {
 if (mapper == null) {
 mapper = new ObjectMapper();
 }

 try {
 ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class);
 LOGGER.info("source data:{}", jsonNode);

 if (!jsonNode.has("record")) {
 LOGGER.warn("not required section[record] found, pass, received:{}", jsonNode);
 return JsonNodeFactory.instance.objectNode();
 }

 JsonNode record = jsonNode.get("record");
 if (!record.isObject()) {
 LOGGER.warn("value of section[record] should be Object. pls check your 
input:{}", jsonNode);
 return JsonNodeFactory.instance.objectNode();
 }

 LOGGER.info("record data:{}", record);
 System.out.println("record data: " + record.toString());
 return (ObjectNode) record;
 } catch (Exception e) {
 LOGGER.warn("ETL clean up fail for source stream data, pls check your data 
schema. fail over. data received: {}", new String(message, 
Charset.forName("UTF-8")), e);
 }

 return null;
 }


 @Override
 public boolean isEndOfStream(ObjectNode nextElement) {
 return false;
 }

 }

}

 

I got an error while I fire the job 

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 31 to line 1, column 39: Expression 
'target_id' is not being grouped
 at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
 at 
org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
 at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535)
 at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 31 to line 1, column 39: Expression 'target_id' is not being grouped
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
 at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
 at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654)
 at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117)
 at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
 at 
org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231)
 at 
org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218)
 at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
 at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926)
 at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611)
 at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
 ... 3 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression 
'target_id' is not being grouped
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
 at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
 ... 22 more

 

I need help here:

1.  could someone point my missing?

2 . if i want got the result like the mysql usage below , what do i change my 
code?

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type;

+-------------+-----------+---------+--------------+

| target_type | target_id | amt_pay | down_payment |

+-------------+-----------+---------+--------------+

| 5           | 1         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-------------+-----------+---------+--------------+

3 rows in set (0.00 sec)

 

many thanks ~~

 

 

 

> Some thing about Flink SQL distinct, I need help
> ------------------------------------------------
>
>                 Key: FLINK-8619
>                 URL: https://issues.apache.org/jira/browse/FLINK-8619
>             Project: Flink
>          Issue Type: Wish
>          Components: Table API &amp; SQL
>    Affects Versions: 1.4.0
>            Reporter: Lynch Lee
>            Priority: Major
>             Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +-------------+-----------+---------+--------------+
> | target_type | target_id | amt_pay | down_payment |
> +-------------+-----------+---------+--------------+
> | 5           | 1         |       1 |            1 |
> | 6           | 2         |       1 |            1 |
> | 5           | 3         |       1 |            1 |
> | 7           | 3         |       1 |            1 |
> +-------------+-----------+---------+--------------+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +-------------+-----------+---------+--------------+
> | target_type | target_id | amt_pay | down_payment |
> +-------------+-----------+---------+--------------+
> | 5           | 1         |       1 |            1 |
> | 6           | 2         |       1 |            1 |
> | 7           | 3         |       1 |            1 |
> +-------------+-----------+---------+--------------+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +-------------+-----------+---------+--------------+
> | target_type | target_id | amt_pay | down_payment |
> +-------------+-----------+---------+--------------+
> | 5           | 1         |       1 |            1 |
> | 5           | 3         |       1 |            1 |
> | 6           | 2         |       1 |            1 |
> | 7           | 3         |       1 |            1 |
> +-------------+-----------+---------+--------------+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
> import com.fasterxml.jackson.databind.node.JsonNodeFactory;
> import com.fasterxml.jackson.databind.node.ObjectNode;
> import com.god.hala.flink.convertors.RowIntoJson;
> import com.god.hala.flink.sources.DataSources;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
> import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
> import org.apache.flink.table.api.StreamQueryConfig;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.nio.charset.Charset;
> import java.util.Properties;
> import java.util.UUID;
> public class KafkaConn2Topics1 {
>  public static void main(String[] args) throws Exception {
>  String inputTopic = "input-case01-test02";
>  String outputTopic = "output-case01-test02";
>  Properties props = new Properties();
>  props.setProperty("bootstrap.servers", "data-node5:9092");
>  props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", 
> ""));
>  LocalStreamEnvironment streamEnv = 
> StreamExecutionEnvironment.createLocalEnvironment();
>  streamEnv.setParallelism(1);
>  streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>  streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>  StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.getTableEnvironment(streamEnv);
>  StreamQueryConfig qConfig = tableEnv.queryConfig();
>  qConfig.withIdleStateRetentionTime(Time.seconds(0));
>  streamEnv.getConfig().enableSysoutLogging();
>  
> streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>  10000));
>  RowTypeInfo rowSchema = new RowTypeInfo(
>  new TypeInformation[]\{Types.STRING(), Types.STRING(), Types.STRING(), 
> Types.STRING()},
>  new String[]\{"target_type", "target_id", "amt_pay", "down_payment"}
>  );
>  DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, 
> props))
>  .map(new JsonIntoRow(rowSchema))
>  .returns(rowSchema);
>  final String _table = "table_" + 
> UUID.randomUUID().toString().replaceAll("-", "");
>  tableEnv.registerDataStream(_table, _inDataStream);
>  final String _in_fields = " target_id, amt_pay, down_payment";
>  String sql = "select distinct(target_type)," + _in_fields + " from " + 
> _table + " group by target_type";
>  System.out.println(sql);
>  Table resultTable = tableEnv.sql(sql);
>  DataStream<Row> _outStream =
>  tableEnv.toRetractStream(resultTable, Row.class, qConfig)
>  .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() {
>  @Override
>  public void processElement(Tuple2<Boolean, Row> value, Context ctx, 
> Collector<Row> out) throws Exception {
>  ObjectNode node = new RowIntoJson(rowSchema).run(value.f1);
>  System.out.println("out1 row: " + node.toString());
>  if (value.f0) {
>  out.collect(value.f1);
>  ObjectNode node1 = new RowIntoJson(rowSchema).run(value.f1);
>  System.out.println("out11 row: " + node1.toString());
>  }
>  }
>  })
>  .map(new MapFunction<Row, Row>() {
>  @Override
>  public Row map(Row value) throws Exception {
>  ObjectNode node = new RowIntoJson(rowSchema).run(value);
>  System.out.println("out2 row: " + node.toString());
>  return value;
>  }
>  }).name("result-pickout1-source2")
>  .returns(rowSchema);
>  _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema));
>  streamEnv.execute(UUID.randomUUID().toString());
>  }
>  private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String 
> outputTopic, Properties props, RowTypeInfo rowSchema) {
>  return new FlinkKafkaProducer010<>(outputTopic, new 
> JsonRowSerializationSchema(rowSchema), props);
>  }
>  private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String 
> inputTopic, Properties props) {
>  return new FlinkKafkaConsumer010<>(inputTopic, new MyJson2ObjectNodeDeser(), 
> props);
>  }
>  public static class MyJson2ObjectNodeDeser extends 
> AbstractDeserializationSchema<ObjectNode> {
>  private static final Logger LOGGER = 
> LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class);
>  private static ObjectMapper mapper = new ObjectMapper();
>  static {
>  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
>  mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
>  mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
> true);
>  }
>  @Override
>  public ObjectNode deserialize(byte[] message) {
>  if (mapper == null) {
>  mapper = new ObjectMapper();
>  }
>  try {
>  ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class);
>  LOGGER.info("source data:{}", jsonNode);
>  if (!jsonNode.has("record")) {
>  LOGGER.warn("not required section[record] found, pass, received:{}", 
> jsonNode);
>  return JsonNodeFactory.instance.objectNode();
>  }
>  JsonNode record = jsonNode.get("record");
>  if (!record.isObject()) {
>  LOGGER.warn("value of section[record] should be Object. pls check your 
> input:{}", jsonNode);
>  return JsonNodeFactory.instance.objectNode();
>  }
>  LOGGER.info("record data:{}", record);
>  System.out.println("record data: " + record.toString());
>  return (ObjectNode) record;
>  } catch (Exception e) {
>  LOGGER.warn("ETL clean up fail for source stream data, pls check your data 
> schema. fail over. data received: {}", new String(message, 
> Charset.forName("UTF-8")), e);
>  }
>  return null;
>  }
>  @Override
>  public boolean isEndOfStream(ObjectNode nextElement) {
>  return false;
>  }
>  }
> }
>  
> I got an error while I fire the job 
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 31 to line 1, column 39: 
> Expression 'target_id' is not being grouped
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
>  at 
> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
>  at 
> org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535)
>  at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 31 to line 1, column 39: Expression 'target_id' is not being grouped
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
>  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654)
>  at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117)
>  at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41)
>  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
>  at 
> org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231)
>  at 
> org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926)
>  at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611)
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
>  ... 3 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression 
> 'target_id' is not being grouped
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>  at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>  ... 22 more
>  
> I need help here:
> 1.  could someone point my missing?
> 2 . if i want got the result like the mysql usage below , what do i change my 
> code?
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +-------------+-----------+---------+--------------+
> | target_type | target_id | amt_pay | down_payment |
> +-------------+-----------+---------+--------------+
> | 5           | 1         |       1 |            1 |
> | 6           | 2         |       1 |            1 |
> | 7           | 3         |       1 |            1 |
> +-------------+-----------+---------+--------------+
> 3 rows in set (0.00 sec)
>  
> many thanks ~~
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to