[
https://issues.apache.org/jira/browse/FLINK-15357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jingsong Lee updated FLINK-15357:
---------------------------------
Fix Version/s: (was: 1.9.2)
(was: 1.10.0)
> schema created by JsonRowSchemaConverter are not suitable for
> TableEnv.sqlQuery table schema
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-15357
> URL: https://issues.apache.org/jira/browse/FLINK-15357
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common, Table SQL / API
> Affects Versions: 1.9.1
> Environment: You can reappear the bug by the following code
> String sql = "SELECT count(*) as cnt, age, TUMBLE_START(rowtime, INTERVAL
> '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10'
> SECOND), age";
> StreamExecutionEnvironment senv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
> DataStream<User> source = senv.addSource(new SourceFunction<User>() {
> @Override
> public void run(SourceContext<User> sourceContext) throws Exception {
> int i = 1000;
> String[] names = \{"Hanmeimei", "Lilei"};
> while (i > 1) {
> sourceContext.collect(new User(names[i%2], i, new
> Timestamp(System.currentTimeMillis())));
> Thread.sleep(10);
> i--;
> }
> }
> @Override
> public void cancel() {
> }
> });
> tenv.registerDataStream("abc", source, "name, age, timestamp,
> rowtime.rowtime");
> Table table = tenv.sqlQuery(sql);
> List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http"));
> TypeInformation<Row> typeInformation = JsonRowSchemaConverter.convert("{" +
> " type:'object'," +
> " properties:{" +
> " cnt: {" +
> " type: 'number'" +
> " }," +
> " tumTime:{" +
> " type:'string'," +
> " format:'date-time'" +
> " }" +
> " }" +
> "}");
> RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
> TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes();
> String[] fieldNames = typeInfo.getFieldNames();
> TableSchema.Builder builder = TableSchema.builder();
> for (int i = 0; i < typeInformations.length; i ++) {
> builder.field(fieldNames[i], typeInformations[i]);
> }
> Elasticsearch6UpsertTableSink establesink = new
> Elasticsearch6UpsertTableSink(
> true,
> builder.build(),
> hosts,
> "aggregation",
> "data",
> "$",
> "n/a",
> new JsonRowSerializationSchema.Builder(typeInformation).build(),
> XContentType.JSON,
> new IgnoringFailureHandler(),
> new HashMap<>()
> );
> tenv.registerTableSink("aggregationTableSink", establesink);
> table.insertInto("aggregationTableSink");
> }
> @Data
> @AllArgsConstructor
> @NoArgsConstructor
> public static class User {
> private String name;
> private Integer age;
> private Timestamp timestamp;
> }
> Reporter: 巫旭阳
> Priority: Major
> Original Estimate: 72h
> Remaining Estimate: 72h
>
> Use JsonRowSchemaConverter.convert(jsonString) create schema TypeInfo area
> only sport bigdecimal DataType of number , but the Table created by
> usingTableEnvironmentImpl.sqlQuer(sqlString) may has a lot of number
> DataTypes such as Long, Integer。
> when program run it will throw an exception like below:
> {color:#FF0000}Field types of query result and registered TableSink [XXX] do
> not match.{color}
> {color:#FF0000}Query result schema: [cnt: Long, tumTime: Timestamp]{color}
> {color:#FF0000}TableSink schema: [cnt: BigDecimal, tumTime: Timestamp]{color}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)