[ 
https://issues.apache.org/jira/browse/FLINK-27950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yantao updated FLINK-27950:
---------------------------
    Description: 
when I use flink streaming and write into hive,I set the following parameters:
{code:java}
TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
I can't find the suffix "," file when I check in hdfs, but it fires normally 
when I use Hive SQL.

Am I using it correctly? or other reasons?

this is my demo:

 
{code:java}
val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
tableEnvSettings)

val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")

tEnv.registerCatalog("myHive", catalog)
tEnv.useCatalog("myHive")
tEnv.useDatabase("xx")

tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val createTSql: String =
  s"""
    |create table if not exists $hiveTable (
    | ...
    |)
    |...
    |TBLPROPERTIES (
    |  'sink.parallelism'='1',
    |  'partition.time-extractor.timestamp-pattern'='$$dt',
    |  'sink.shuffle-by-partition.enable'='true',
    |  'sink.partition-commit.policy.kind'='metastore,success-file',
    |  'hive.output.file.extension'='.parquet'
    |)
    |""".stripMargin
tEnv.executeSql(createTSql)
tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

tEnv.createTemporaryView("t_xx", DataStream)

val insertSql: String =
  s"""
     |  insert into $hiveTable
     |  select *
     |  from t_xx
    |""".stripMargin

tEnv.executeSql(insertSql)
tEnv.dropTemporaryView("t_userAction") {code}
 

 

 

  was:
当我用flink streaming 写入hive时,设置了hive参数如下:
{code:java}
TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
我查看hdfs文件时,并没有找到.parquet后缀的文件,但我在hive中执行sql写入时是正常触发该参数的。是我使用不正确还是有其他原因呢?

以下是我的demo:

 
{code:java}
val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
tableEnvSettings)

val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")

tEnv.registerCatalog("myHive", catalog)
tEnv.useCatalog("myHive")
tEnv.useDatabase("xx")

// 若使用方言 则需要把flink-sql-connector-hive包 放到flink/lib 路径下
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val createTSql: String =
  s"""
    |create table if not exists $hiveTable (
    | ...
    |)
    |...
    |TBLPROPERTIES (
    |  'sink.parallelism'='1',
    |  'partition.time-extractor.timestamp-pattern'='$$dt',
    |  'sink.shuffle-by-partition.enable'='true',
    |  'sink.partition-commit.policy.kind'='metastore,success-file',
    |  'hive.output.file.extension'='.parquet'
    |)
    |""".stripMargin
tEnv.executeSql(createTSql)
tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

tEnv.createTemporaryView("t_xx", DataStream)

val insertSql: String =
  s"""
     |  insert into $hiveTable
     |  select *
     |  from t_xx
    |""".stripMargin

tEnv.executeSql(insertSql)
tEnv.dropTemporaryView("t_userAction") {code}
 

 

 


> flink streaming写入hive表,设置的文件后缀参数失效
> ----------------------------------
>
>                 Key: FLINK-27950
>                 URL: https://issues.apache.org/jira/browse/FLINK-27950
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.13.2
>         Environment: flink:flink-1.13.2
> hive: 1.1.0-cdh5.14.0
>            Reporter: yantao
>            Priority: Major
>              Labels: flink-connector-hive
>
> when I use flink streaming and write into hive,I set the following parameters:
> {code:java}
> TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
> I can't find the suffix "," file when I check in hdfs, but it fires normally 
> when I use Hive SQL.
> Am I using it correctly? or other reasons?
> this is my demo:
>  
> {code:java}
> val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
> tableEnvSettings)
> val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")
> tEnv.registerCatalog("myHive", catalog)
> tEnv.useCatalog("myHive")
> tEnv.useDatabase("xx")
> tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> val createTSql: String =
>   s"""
>     |create table if not exists $hiveTable (
>     | ...
>     |)
>     |...
>     |TBLPROPERTIES (
>     |  'sink.parallelism'='1',
>     |  'partition.time-extractor.timestamp-pattern'='$$dt',
>     |  'sink.shuffle-by-partition.enable'='true',
>     |  'sink.partition-commit.policy.kind'='metastore,success-file',
>     |  'hive.output.file.extension'='.parquet'
>     |)
>     |""".stripMargin
> tEnv.executeSql(createTSql)
> tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> tEnv.createTemporaryView("t_xx", DataStream)
> val insertSql: String =
>   s"""
>      |  insert into $hiveTable
>      |  select *
>      |  from t_xx
>     |""".stripMargin
> tEnv.executeSql(insertSql)
> tEnv.dropTemporaryView("t_userAction") {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to