[
https://issues.apache.org/jira/browse/FLINK-27950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yantao updated FLINK-27950:
---------------------------
Description:
when I use flink streaming and write into hive,I set the following parameters:
{code:java}
TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
I can't find the suffix ".parquet" file when I check in hdfs, but it fires
normally when I use Hive SQL.
Am I using it correctly? or other reasons?
this is my demo:
{code:java}
val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
tableEnvSettings)
val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")
tEnv.registerCatalog("myHive", catalog)
tEnv.useCatalog("myHive")
tEnv.useDatabase("xx")
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val createTSql: String =
s"""
|create table if not exists $hiveTable (
| ...
|)
|...
|TBLPROPERTIES (
| 'sink.parallelism'='1',
| 'partition.time-extractor.timestamp-pattern'='$$dt',
| 'sink.shuffle-by-partition.enable'='true',
| 'sink.partition-commit.policy.kind'='metastore,success-file',
| 'hive.output.file.extension'='.parquet'
|)
|""".stripMargin
tEnv.executeSql(createTSql)
tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tEnv.createTemporaryView("t_xx", DataStream)
val insertSql: String =
s"""
| insert into $hiveTable
| select *
| from t_xx
|""".stripMargin
tEnv.executeSql(insertSql)
tEnv.dropTemporaryView("t_userAction") {code}
was:
when I use flink streaming and write into hive,I set the following parameters:
{code:java}
TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
I can't find the suffix "," file when I check in hdfs, but it fires normally
when I use Hive SQL.
Am I using it correctly? or other reasons?
this is my demo:
{code:java}
val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
tableEnvSettings)
val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")
tEnv.registerCatalog("myHive", catalog)
tEnv.useCatalog("myHive")
tEnv.useDatabase("xx")
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val createTSql: String =
s"""
|create table if not exists $hiveTable (
| ...
|)
|...
|TBLPROPERTIES (
| 'sink.parallelism'='1',
| 'partition.time-extractor.timestamp-pattern'='$$dt',
| 'sink.shuffle-by-partition.enable'='true',
| 'sink.partition-commit.policy.kind'='metastore,success-file',
| 'hive.output.file.extension'='.parquet'
|)
|""".stripMargin
tEnv.executeSql(createTSql)
tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tEnv.createTemporaryView("t_xx", DataStream)
val insertSql: String =
s"""
| insert into $hiveTable
| select *
| from t_xx
|""".stripMargin
tEnv.executeSql(insertSql)
tEnv.dropTemporaryView("t_userAction") {code}
> flink streaming写入hive表,设置的文件后缀参数失效
> ----------------------------------
>
> Key: FLINK-27950
> URL: https://issues.apache.org/jira/browse/FLINK-27950
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.13.2
> Environment: flink:flink-1.13.2
> hive: 1.1.0-cdh5.14.0
> Reporter: yantao
> Priority: Major
> Labels: flink-connector-hive
>
> when I use flink streaming and write into hive,I set the following parameters:
> {code:java}
> TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
> I can't find the suffix ".parquet" file when I check in hdfs, but it fires
> normally when I use Hive SQL.
> Am I using it correctly? or other reasons?
> this is my demo:
>
> {code:java}
> val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
> tableEnvSettings)
> val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")
> tEnv.registerCatalog("myHive", catalog)
> tEnv.useCatalog("myHive")
> tEnv.useDatabase("xx")
> tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> val createTSql: String =
> s"""
> |create table if not exists $hiveTable (
> | ...
> |)
> |...
> |TBLPROPERTIES (
> | 'sink.parallelism'='1',
> | 'partition.time-extractor.timestamp-pattern'='$$dt',
> | 'sink.shuffle-by-partition.enable'='true',
> | 'sink.partition-commit.policy.kind'='metastore,success-file',
> | 'hive.output.file.extension'='.parquet'
> |)
> |""".stripMargin
> tEnv.executeSql(createTSql)
> tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> tEnv.createTemporaryView("t_xx", DataStream)
> val insertSql: String =
> s"""
> | insert into $hiveTable
> | select *
> | from t_xx
> |""".stripMargin
> tEnv.executeSql(insertSql)
> tEnv.dropTemporaryView("t_userAction") {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)