[
https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-11862:
-----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was:
auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> many differents query on same stream cause second condition of where of
> second query error
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-11862
> URL: https://issues.apache.org/jira/browse/FLINK-11862
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.7.2
> Environment: flink 1.7版本 java 1.8
> Reporter: zhengbm
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> List<String> fields = Lists.newArrayList("rawMessage","timestamp");
> Schema schema = new Schema();
> for (int i = 0; i < fields.size(); i++)
> { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }
> tableEnvironment.connect(new Kafka()
> .version("0.8")
> .properties(properties)
> .topic("raw_playtime_h5_source")
> .startFromLatest()
> )
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(schema)
> .inAppendMode()
> .registerTableSource("t1");
> Table table2 = tableEnvironment
> .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as
> flash from t1 ,LATERAL TABLE(split(rawMessage,'
> t')) as T(maps) ");
> tableEnvironment.registerTable("t2", table2);
> Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from
> t2 where maps_length>0");
> TypeInformation typeInformation = table.getSchema().toRowType();
> String[] columns = table.getSchema().getFieldNames();
> DataStream<String> dataStream = tableEnvironment
> .toAppendStream(table, typeInformation)
> .map(new PhysicTransformMap(columns, 0));
> dataStream.print();
> try
> { env.execute(); }
> catch (Exception e)
> { e.printStackTrace(); }
> {color:#d04437}noted:message of kafka : \{"timestamp" :
> "xxxx","rawMessage":"xxx\txxx\txxxx\t"}(if I delete where condition of second
> query will ok,for "select `timestamp`,maps_length from t2 " or "select
> `timestamp`,maps_length from t2 where timestamp>0" is also ok){color}
> {color:#d04437}next is trace of exception{color}
> Exception in thread "main" org.apache.flink.table.codegen.CodeGenException:
> Invalid input access.
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
> at
> org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
> at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
> at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
> at
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
> at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
> at
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
> at
> org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)