Spongebob created FLINK-23177:
---------------------------------
Summary: watermarks generated in dateStream can not flow into table
Key: FLINK-23177
URL: https://issues.apache.org/jira/browse/FLINK-23177
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.13.1
Environment: flink: 1.13.1
Reporter: Spongebob
I have assigned watermark in dataStream and then use the `createTemporaryView`
method to build a table that is source from the dataStream. Out of my
expectation, the watermarks works normally in dataStream but the watermarks of
the table stay at -9223372036854775808 forever.
{code:java}
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(1000)
streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
val dataStream = streamEnv.socketTextStream("192.168.164.105", 9999)
val resultStream = dataStream.map(
value => {
val data = value.split(",")
(data(0), data(1).toInt)
}
).assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
override def extractTimestamp(element: (String, Int), recordTimestamp:
Long): Long = element._2 * 1000
}))
.process(new MyProcessFunc)
// resultStream.print("raw")
// streamEnv.execute("")
val streamTableEnv = buildStreamTableEnv(streamEnv,
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
streamTableEnv.createTemporaryView("catalog_test1", resultStream)
val catalog = buildHiveCatalog
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test1 select _1,_2 from
default_catalog.default_database.catalog_test1").print()
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)