[
https://issues.apache.org/jira/browse/FLINK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu updated FLINK-18549:
----------------------------
Release Note: (was: my hadoop version is 2.6-cdh 5.8 , I upgrade it to
hadoop 3.1.2 ,it is successed)
> flink 1.11 can not commit partition automatically
> -------------------------------------------------
>
> Key: FLINK-18549
> URL: https://issues.apache.org/jira/browse/FLINK-18549
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.0
> Reporter: Jun Zhang
> Priority: Major
> Fix For: 1.11.2
>
>
> I use the sql of flink 1.11, read from kafka and writing to hdfs, I found
> that the partition cannot be submitted automatically. This is my complete
> code。
> My checkpoint interval is 10s. I think it should be normal that there will be
> _SUCCESS file under the partition of hdfs every 10s, but in fact there is no
>
> {code:java}
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.enableCheckpointing(10000);
> bsEnv.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> String sqlSource = "CREATE TABLE source_kafka (\n" +
> " appName STRING,\n" +
> " appVersion STRING,\n" +
> " uploadTime STRING\n" +
> ") WITH (\n" +
> " 'connector.type' = 'kafka', \n" +
> " 'connector.version' = '0.10',\n" +
> " 'connector.topic' = 'test_topic',\n" +
> " 'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> " 'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> " 'connector.properties.group.id' = 'testGroup',\n"
> +
> " 'format.type'='json',\n" +
> " 'update-mode' = 'append' )";
> tEnv.executeSql(sqlSource);
> String sql = "CREATE TABLE fs_table (\n" +
> " appName STRING,\n" +
> " appVersion STRING,\n" +
> " uploadTime STRING,\n" +
> " dt STRING," +
> " h string" +
> ") PARTITIONED BY (dt,h) WITH (\n" +
> " 'connector'='filesystem',\n" +
> " 'path'='hdfs://localhost/tmp/',\n" +
> " 'sink.partition-commit.policy.kind' = 'success-file',
> " +
> " 'format'='orc'\n" +
> ")";
> tEnv.executeSql(sql);
> String insertSql = "insert into fs_table SELECT appName
> ,appVersion,uploadTime, " +
> " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
> DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> tEnv.executeSql(insertSql);
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)