[
https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Spongebob closed FLINK-22874.
-----------------------------
Release Note: my misunderstanding of streaming partition sink cause this
issue, it worked normally after I had enabled checkpointing.
Resolution: Not A Problem
> flink table partition trigger doesn't effect as expectation when sink into
> hive table
> -------------------------------------------------------------------------------------
>
> Key: FLINK-22874
> URL: https://issues.apache.org/jira/browse/FLINK-22874
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.13.1
> Reporter: Spongebob
> Priority: Major
>
> I am trying to sink into hive partitioned table which partition commit
> trigger is declared as "
> partition-time", and I had assigned watermark on the dataStream. When I input
> some data into dataStream it can not commit hive partition on time. Here's my
> code
> {code:java}
> //ddl of hive table
> create table test_table(username string)
> partitioned by (ts bigint)
> stored as orc
> TBLPROPERTIES (
> 'sink.partition-commit.trigger'='partition-time',
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> );{code}
> {code:java}
> // flink application code
> val streamEnv = ...
> val dataStream:DataStream[(String, Long)] = ...
> // assign watermark and output watermark info in processFunction
> class MyProcessFunction extends ProcessFunction[(String, Long), (String,
> Long, Long)] {
> override def processElement(value: (String, Long), ctx:
> ProcessFunction[(String, Long), (String, Long, Long)]#Context, out:
> Collector[(String, Long, Long)]): Unit = {
> out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
> }
> }
> val resultStream = dataStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
> .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
> override def extractTimestamp(element: (String, Long), recordTimestamp:
> Long): Long = {
> element._2 * 1000
> }
> }))
> .process(new MyProcessFunction)
> //
> val streamTableEnv = buildStreamTableEnv(streamEnv,
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> // convert dataStream into hive catalog table and sink into hive
> streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
> val catalog = ...
> streamTableEnv.registerCatalog("hive", catalog)
> streamTableEnv.useCatalog("hive")
> streamTableEnv.executeSql("insert into test_table select _1,_2 from
> default_catalog.default_database.test_catalog_t").print()
> // flink use the default parallelism 4
> // input data
> (a, 1)
> (b, 2)
> (c, 3)
> (d, 4)
> (a, 5)
> ...
> // result
> there are much partition directories on hdfs but all they are inprogressing
> files and never would be commit to hive metastore.{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)