qw2qw2 commented on issue #10779:
URL: https://github.com/apache/hudi/issues/10779#issuecomment-2288248931
> Could you give a full reproduce code? include hive qeury sql. I'm not able
to reproduce.
**Bellow is my code:**
package com.flink.dw.dws;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class HudiTableFlink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000*10);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(6000000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.setStateBackend(new
RocksDBStateBackend("hdfs:///user/xxx/checkpoint/table1",true));
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
env.setParallelism(16);
tableEnv.executeSql(
"CREATE TABLE TRANS_INFO ("+
"ID INT," +
"MERCHANT_ID INT," +
"MERCHANT_NAME STRING," +
"PRODUCT_ID INT," +
"PRODUCT_NAME STRING," +
"TRANS_ID STRING," +
"PS_TRANS_ID STRING," +
"TRANS_CREATED_TIME
TIMESTAMP(3)," +
...
"`YEAR` STRING," +
"`MONTH` STRING," +
"`DAY` STRING," +
"`HOUR` INT," +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' =
'"+ConfigUtil.getConfig().get("kafka.topic.prefix")+"PROD_RDC'," +
" 'properties.bootstrap.servers' = '"+
ConfigUtil.getConfig().get("kafka.bootstrap-servers")+"'," +
" 'properties.group.id' = '2024081400'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'" +
")"
);
tableEnv.executeSql(
"CREATE TABLE table1(" +
"ID INT," +
"MERCHANT_ID INT," +
"MERCHANT_NAME STRING," +
"PRODUCT_ID INT," +
"PRODUCT_NAME STRING," +
"TRANS_ID STRING," +
"PS_TRANS_ID STRING," +
"TRANS_CREATED_TIME
TIMESTAMP(3)," +
...
"`YEAR` STRING," +
"`MONTH` STRING," +
"`DAY` STRING," +
"`HOUR` INT," +
"TRANS_YEAR STRING,"+
"TRANS_MONTH STRING "+
//"TRANS_DAY STRING"+
")" +
"PARTITIONED BY (TRANS_YEAR ,TRANS_MONTH)" +
"WITH(" +
" 'connector' = 'hudi'," +
" 'path'='hdfs:///user/xxx/table1'," +
" 'table.type' = 'COPY_ON_WRITE'," +
" 'write.operation' = 'upsert'," +
"
'hoodie.datasource.write.hive_style_partitioning' = 'true',"+
" 'hoodie.datasource.write.recordkey.field' =
'ID'," +
" 'write.precombine.field' =
'TRANS_CREATED_TIME'," +
"
'hoodie.datasource.write.hive_style_partitioning' = 'true',"+
" 'hive_sync.enable' = 'true'," +
" 'hive_sync.mode' = 'hms'," +
"
'hive_sync.conf.dir'='hdfs:///user/hive/public/conf'," +
" 'hive_sync.db' = 'dbname'," +
" 'hive_sync.table' = 'table1'," +
" 'hive_sync.partition_fields' = 'TRANS_YEAR
,TRANS_MONTH'," +
" 'hive_sync.partition_extractor_class' =
'org.apache.hudi.hive.MultiPartKeysValueExtractor'"+
")"
);
tableEnv.executeSql("insert into table1 ( " +
"ID ," +
"MERCHANT_ID ," +
"MERCHANT_NAME ," +
"PRODUCT_ID ," +
"PRODUCT_NAME ," +
"TRANS_ID ," +
"TRANS_CREATED_TIME ," +
...
"`YEAR` ," +
"`MONTH` ," +
"`DAY` ," +
"`HOUR` ," +
"TRANS_YEAR," +
"TRANS_MONTH " +
")" +
"SELECT " +
"ID ," +
"MERCHANT_ID ," +
"MERCHANT_NAME ," +
"PRODUCT_ID ," +
"PRODUCT_NAME ," +
"TRANS_ID ," +
"TRANS_CREATED_TIME ," +
...
"`YEAR` ," +
"`MONTH` ," +
"`DAY` ," +
"`HOUR` ," +
"`YEAR` as TRANS_YEAR," +
"`MONTH` as TRANS_MONTH " +
"FROM TRANS_INFO");
}
}
**and my hive sql:**
select * from table1 where trans_year='2024' and trans_month='7' and
id=1698350 limit 10;
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]