[ 
https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengbm updated FLINK-11862:
----------------------------
    Description: 
List<String> fields = Lists.newArrayList("rawMessage","timestamp");
 Schema schema = new Schema();
 for (int i = 0; i < fields.size(); i++)

{ schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }

tableEnvironment.connect(new Kafka()
 .version("0.8")
 .properties(properties)
 .topic("raw_playtime_h5_source")
 .startFromLatest()
 )
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("t1");

Table table2 = tableEnvironment
 .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
flash from t1 ,LATERAL TABLE(split(rawMessage,'
 t')) as T(maps) ");

tableEnvironment.registerTable("t2", table2);

Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 
where maps_length>0");

TypeInformation typeInformation = table.getSchema().toRowType();

String[] columns = table.getSchema().getFieldNames();
 DataStream<String> dataStream = tableEnvironment
 .toAppendStream(table, typeInformation)
 .map(new PhysicTransformMap(columns, 0));

dataStream.print();

try

{ env.execute(); }

catch (Exception e)

{ e.printStackTrace(); }

{color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : 
"xxxx","rawMessage":"xxx\txxx\txxxx\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常)函数split是把一条string切分为map结构{color}

报错信息如下

Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Invalid input access.
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at scala.Option.getOrElse(Option.scala:120)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
 at 
org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)

  was:
List<String> fields = Lists.newArrayList("rawMessage","timestamp");
 Schema schema = new Schema();
 for (int i = 0; i < fields.size(); i++)

{ schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }

tableEnvironment.connect(new Kafka()
 .version("0.8")
 .properties(properties)
 .topic("raw_playtime_h5_source")
 .startFromLatest()
 )
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("t1");

Table table2 = tableEnvironment
 .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
flash from t1 ,LATERAL TABLE(split(rawMessage,'
t')) as T(maps) ");

tableEnvironment.registerTable("t2", table2);

Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 
where maps_length>0");

TypeInformation typeInformation = table.getSchema().toRowType();

String[] columns = table.getSchema().getFieldNames();
 DataStream<String> dataStream = tableEnvironment
 .toAppendStream(table, typeInformation)
 .map(new PhysicTransformMap(columns, 0));

dataStream.print();

try

{ env.execute(); }

catch (Exception e)

{ e.printStackTrace(); }

{color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : 
"xxxx","rawMessage":"xxx\txxx\txxxx\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常){color}

报错信息如下

Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Invalid input access.
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at scala.Option.getOrElse(Option.scala:120)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
 at 
org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)


> 在同一条流上进行多次不同的sql,第二个sql的where条件不可用
> ----------------------------------
>
>                 Key: FLINK-11862
>                 URL: https://issues.apache.org/jira/browse/FLINK-11862
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Table SQL
>    Affects Versions: 1.7.2
>         Environment: flink 1.7版本 java 1.8
>            Reporter: zhengbm
>            Priority: Major
>
> List<String> fields = Lists.newArrayList("rawMessage","timestamp");
>  Schema schema = new Schema();
>  for (int i = 0; i < fields.size(); i++)
> { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }
> tableEnvironment.connect(new Kafka()
>  .version("0.8")
>  .properties(properties)
>  .topic("raw_playtime_h5_source")
>  .startFromLatest()
>  )
>  .withFormat(new Json().failOnMissingField(false).deriveSchema())
>  .withSchema(schema)
>  .inAppendMode()
>  .registerTableSource("t1");
> Table table2 = tableEnvironment
>  .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
> flash from t1 ,LATERAL TABLE(split(rawMessage,'
>  t')) as T(maps) ");
> tableEnvironment.registerTable("t2", table2);
> Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from 
> t2 where maps_length>0");
> TypeInformation typeInformation = table.getSchema().toRowType();
> String[] columns = table.getSchema().getFieldNames();
>  DataStream<String> dataStream = tableEnvironment
>  .toAppendStream(table, typeInformation)
>  .map(new PhysicTransformMap(columns, 0));
> dataStream.print();
> try
> { env.execute(); }
> catch (Exception e)
> { e.printStackTrace(); }
> {color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : 
> "xxxx","rawMessage":"xxx\txxx\txxxx\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常)函数split是把一条string切分为map结构{color}
> 报错信息如下
> Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
> Invalid input access.
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
>  at scala.Option.getOrElse(Option.scala:120)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
>  at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
>  at 
> org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
>  at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
>  at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to