[ 
https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-22874.
-----------------------------
    Release Note: my misunderstanding of streaming partition sink cause this 
issue, it worked normally after I had enabled checkpointing.
      Resolution: Not A Problem

> flink table partition trigger doesn't effect as expectation when sink into 
> hive table
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-22874
>                 URL: https://issues.apache.org/jira/browse/FLINK-22874
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.13.1
>            Reporter: Spongebob
>            Priority: Major
>
> I am trying to sink into hive partitioned table which partition commit 
> trigger is declared as "
> partition-time", and I had assigned watermark on the dataStream. When I input 
> some data into dataStream it can not commit hive partition on time. Here's my 
> code
> {code:java}
> //ddl of hive table 
> create table test_table(username string)
> partitioned by (ts bigint)
> stored as orc
> TBLPROPERTIES (
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.policy.kind'='metastore,success-file'
> );{code}
> {code:java}
> // flink application code
> val streamEnv = ...
> val dataStream:DataStream[(String, Long)] = ...
> // assign watermark and output watermark info in processFunction
> class MyProcessFunction extends ProcessFunction[(String, Long), (String, 
> Long, Long)] {
>   override def processElement(value: (String, Long), ctx: 
> ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
> Collector[(String, Long, Long)]): Unit = {
>     out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
>   }
> }
> val resultStream = dataStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
>     override def extractTimestamp(element: (String, Long), recordTimestamp: 
> Long): Long = {
>       element._2 * 1000
>     }
>   }))
> .process(new MyProcessFunction)
> //
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> // convert dataStream into hive catalog table and sink into hive
> streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
> val catalog = ...
> streamTableEnv.registerCatalog("hive", catalog)
> streamTableEnv.useCatalog("hive")
> streamTableEnv.executeSql("insert into test_table select _1,_2 from 
> default_catalog.default_database.test_catalog_t").print()
> // flink use the default parallelism 4
> // input data
> (a, 1)
> (b, 2)
> (c, 3)
> (d, 4)
> (a, 5)
>  ...
> // result
> there are much partition directories on hdfs but all they are inprogressing 
> files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to