fanfanAlice commented on issue #10700:
URL: https://github.com/apache/hudi/issues/10700#issuecomment-1953436012

   flink sql task1:
   read kafka topic data join hbase table insert to hudi table table1.
   kafka topic only keeps data for three hours.
   There are also hundreds of millions of kafka data and hbase data every day, 
and often tasks fail due to resource issues.
   Because the banking system task fails and cannot be restarted immediately, 
the task must be restarted during the production window.
   By the time the window production period is reached, kafka topic data may 
have been lost.
   So there's another flink task that backs up kafka topic's data in real time. 
this is flink sql task2.
   When task1 fails, read the task2 hudi table data join hbase table to be 
reinserted to the hudi table written by task1. this is flink sql task3
   
   CREATE TABLE hbase_mb6_page_bus(rowkey string,
    info row<
    bus                   string
   ,bdha_hd_tx_time          string
   ,hd_tx_date               string
    >,
    PRIMARY KEY(rowkey) NOT ENFORCED
   ) with ('connector'='hbase-2.2',
   'table-name'='mb6_page_bus',
   'zookeeper.quorum'='xxxx:24002,xxxx:24002,xxxx:24002',
   'properties.hbase.security.authentication'='kerberos', 
   
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
   
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
   );
   
   CREATE TABLE hbase_mb_videomanage(rowkey string,
    info row<
    bvm_videoname            string
   ,bvm_videourl             string
   ,bvm_videolength          string
   ,bvm_starttime            string
   ,bvm_endtime              string
   ,bvm_videotitle           string
   ,bvm_video                string
   ,bvm_imgurl               string
   ,bvm_channel              string
   ,bvm_watchnum             bigint
   ,bvm_messageurl           string
   ,bvm_belongcolumn         string
   ,bvm_keyword              string
   ,bvm_isallowinteract      string
   ,bvm_isallowbulletscreem  string
   ,bvm_isallowshare         string
   ,bvm_watchcrowd           string
   ,bvm_out                  string
   ,bvm_recommend            string
   ,bvm_sharetitle           string
   ,bvm_sharecontent         string
   ,bvm_correlationvideo     string
   ,bvm_comperenumber        string
   ,bvm_recommendorder       bigint
   ,bvm_video_label          bigint
   ,bvm_org_code             string
   ,bvm_channle_id           string
   ,bvm_hot_flag             string
   ,bvm_operator_id          string
   ,bvm_video_flag           string
   ,bvm_recommend_time       string
   ,bvm_imgurl6              string
   ,bvm_configtype           string
   ,bvm_modify_backimage     string
   ,bvm_backimage            string
   ,bvm_isposition           string
   ,bvm_videourl_new         string
   ,bvm_messageurl_new       string
   ,bdha_hd_tx_time          string
   ,hd_tx_date               string
    >,
    PRIMARY KEY(rowkey) NOT ENFORCED
   ) with ('connector'='hbase-2.2',
   'table-name'='mb_videomanage',
   'zookeeper.quorum'='xxxxx:24002,xxxxx:24002,xxxxx:24002',
   'properties.hbase.security.authentication'='kerberos', 
   
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
   
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
   );
   
   
   
   create table cabs_task_mgm_m2 (
   id string,
   shareId string,
   sharerUserId string,
   sharerEcifId string,
   sharedUserId string,
   sharedEcifId string,
   bindTime string,
   bindStatus string,
   finishTime string,
    procTime as PROCTIME()
   ) WITH (
    'connector' = 'kafka-token',
    'topic' = 'CABS-TASK-MGM-M2',
    'properties.topic' = 'CABS-TASK-MGM-M2',
    'properties.rdeg.console' = 'rdeg-chn-slb-pj-1.bocomm.com:8089',
    'properties.rdeg.location' = 'zj',
    'properties.rdeg.token' = 
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzeXN0ZW1OYW1lIjoiREJCSSIsInJ1bkVudiI6InByZCIsIm5vd1RpbWUiOjE2ODQ5Nzk3MDQ3NTh9.De5kUMfRtCRgNPa3SsjJoHpufTQ9uHSN0c9y2DCekK8',
    'properties.group.id' = 'g1',
    'scan.startup.mode' = 'latest-offset', --latest-offset/earliest-offset
    'format' = 'json',
    'properties.flink.partition-discovery.interval-millis' = '1000',
    'properties.zookeeper.sasl.client' = 'false',
    'properties.zookeeper.sasl.clientconfig' = 'false'
    );
   --select * from kafka_rcsp_label_table;
       
   create table streaming_rcsp_label(
   id string  PRIMARY KEY NOT ENFORCED,
   shareId string ,
   sharerUserId string,
   sharerEcifId string,
   sharedUserId string,
   sharedEcifId string,
   bindTime string,
   bindStatus string,
   finishTime string,
    procTime TIMESTAMP,
   hd_tx_date_h string,
   hd_tx_date string,
   bvm_org_code string,
   bus string
   )
   with (
    'connector' = 'hudi',
    'path' = 
'hdfs://hacluster/user/hive/warehouse/kylinstreamdb.db/streaming_rcsp_label',
   'hoodie.datasource.write.recordkey.field' = 'uuid',
   'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
    'hoodie.parquet.max.file.size' = '268435456',
    'write.tasks' = '4',
    'write.bucket_assign.tasks'='1',
    'write.task.max.size'='1024',
    'write.rate.limit'='30000',
    --'hoodie.datasource.write.precombine.field'='ts',
    'table.type'='MERGE_ON_READ',
    'changelog.enable'='true',
     'write.operation'='INSERT',
    'compaction.tasks'='1',
    'compaction.delta_commits'='5',
    'compaction.max_memory'='500',
    'hive_sync.enable'='true',
    'hive_sync.mode'='hms',
    'hive_sync.table'='streaming_rcsp_label',
    'hive_sync.db'='kylinstreamdb',
    'hive_sync.support_timestamp'='true',
   'hive_sync_metastore.uris'='thrift://xxxxx:21088,thrift://xxxxx:21088',
   'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
   'hive_sync.use_kerberos'='true',
   
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
    );
    
   insert into streaming_rcsp_label(
    id                    
   ,shareId
   ,sharerUserId
   ,sharerEcifId
   ,sharedUserId
   ,sharedEcifId
   ,bindTime
   ,bindStatus
   ,finishTime
   ,procTime
   ,hd_tx_date_h
   ,hd_tx_date
   ,bvm_org_code
   ,bus
   ,sysdt
   )
   select 
   t1.id,
   t1.shareId ,
   t1.sharerUserId ,
   t1.sharerEcifId ,
   t1.sharedUserId ,
   t1.sharedEcifId ,
   t1.bindTime ,
   t1.bindStatus,
   t1.finishTime,
   t1.procTime,
   t2.bvm_org_code,
   t3.bus,
   cast(NOW() as string) as sysdt, 
   concat(substr(cast(NOW() as string),1, 13), ':00:00') as hd_tx_date_h,
   substr(cast(NOW() as string),1, 10) as hd_tx_date
   from kafka_rcsp_label_table t1
   left join  hbase_mb_videomanage for system_time as of t1.procTime as t2
   on t1.videoCode=t2.rowkey
   left join hbase_mb6_page_bus for system_time as of t1.procTime as t3
   on t1.mbk_usdf_evnt_id=t3.rowkey;
   
   
   
   flink sql task2: backup kafka topic to hudi
   create table cabs_task_mgm_m2 (
   id string,
   shareId string,
   sharerUserId string,
   sharerEcifId string,
   sharedUserId string,
   sharedEcifId string,
   bindTime string,
   bindStatus string,
   finishTime string,
    procTime as PROCTIME()
   ) WITH (
    'connector' = 'kafka-token',
    'topic' = 'CABS-TASK-MGM-M2',
    'properties.topic' = 'CABS-TASK-MGM-M2',
    'properties.rdeg.console' = 'rdeg-chn-slb-pj-1.bocomm.com:8089',
    'properties.rdeg.location' = 'zj',
    'properties.rdeg.token' = 
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzeXN0ZW1OYW1lIjoiREJCSSIsInJ1bkVudiI6InByZCIsIm5vd1RpbWUiOjE2ODQ5Nzk3MDQ3NTh9.De5kUMfRtCRgNPa3SsjJoHpufTQ9uHSN0c9y2DCekK8',
    'properties.group.id' = 'g1',
    'scan.startup.mode' = 'latest-offset', --latest-offset/earliest-offset
    'format' = 'json',
    'properties.flink.partition-discovery.interval-millis' = '1000',
    'properties.zookeeper.sasl.client' = 'false',
    'properties.zookeeper.sasl.clientconfig' = 'false'
    );
   -- The kafka connector authentication method has been modified
   create table streaming1_cabs_task_mgm_m2(
   id string  PRIMARY KEY NOT ENFORCED,
   shareId string ,
   sharerUserId string,
   sharerEcifId string,
   sharedUserId string,
   sharedEcifId string,
   bindTime string,
   bindStatus string,
   finishTime string,
    procTime TIMESTAMP,
   hd_tx_date_h string,
   hd_tx_date string
   ) with (
   'connector' = 'hudi',
   'path' = 
'hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
   'hoodie.parquet.max.file.size' = '268435456',
   'write.tasks' = '4',
   'write.bucket_assign.tasks'='1',
   'write.task.max.size'='1024',
   'write.rate.limit'='30000',
   'hoodie.datasource.write.precombine.field'='ts',
   'table.type'='MERGE_ON_READ',
   --'index.type'='BUCKET',
   'write.operation'='UPSERT',
   'changelog.enable'='true',
   'compaction.tasks'='1',
   'compaction.async.enable'='false',
   'compaction.delta_commits'='5',
   'compaction.max_memory'='500',
   'hive_sync.enable'='true',
   'hive_sync.mode'='hms',
   'hive_sync.table'='streaming1_cabs_task_mgm_m2',
   'hive_sync.db'='kylinstreamdb',
   'hive_sync.support_timestamp'='true',
   'hive_sync_metastore.uris'='thrift://xxxxx:xxxx,thrift://xxxxxx:xxxx',
   'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
   'hive_sync.use_kerberos'='true',
   
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
   );
   
   insert into streaming1_cabs_task_mgm_m2(
   id,
   shareId  ,
   sharerUserId ,
   sharerEcifId ,
   sharedUserId ,
   sharedEcifId ,
   bindTime ,
   bindStatus ,
   finishTime ,
    procTime ,
    hd_tx_date_h,
    hd_tx_date
   )
   select 
   id ,
   shareId  ,
   sharerUserId ,
   sharerEcifId ,
   sharedUserId ,
   sharedEcifId ,
   bindTime ,
   bindStatus ,
   finishTime ,
    procTime ,
   concat(substr(cast(procTime as string),1, 13), ':00:00') as hd_tx_date_h,
   substr(cast(procTime as string),1, 10)   hd_tx_date
   from cabs_task_mgm_m2 t1 where id is not null;
   
   
   
   flink sql task3:  When task1 fails, read the task2 hudi table data join 
hbase table to be reinserted to the hudi table written by task1
   
   create table streaming1_cabs_task_mgm_m2(
   id string  PRIMARY KEY NOT ENFORCED,
   shareId string ,
   sharerUserId string,
   sharerEcifId string,
   sharedUserId string,
   sharedEcifId string,
   bindTime string,
   bindStatus string,
   finishTime string,
    procTime TIMESTAMP,
   hd_tx_date_h string,
   hd_tx_date string
   ) with (
   'connector' = 'hudi',
   'path' = 
'hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
   'hoodie.parquet.max.file.size' = '268435456',
   'write.tasks' = '4',
   'write.bucket_assign.tasks'='1',
   'write.task.max.size'='1024',
   'write.rate.limit'='30000',
   'hoodie.datasource.write.precombine.field'='ts',
   'table.type'='MERGE_ON_READ',
   --'index.type'='BUCKET',
   'write.operation'='UPSERT',
   'changelog.enable'='true',
   'compaction.tasks'='1',
   'compaction.async.enable'='false',
   'compaction.delta_commits'='5',
   'compaction.max_memory'='500',
   'hive_sync.enable'='true',
   'hive_sync.mode'='hms',
   'hive_sync.table'='streaming1_cabs_task_mgm_m2',
   'hive_sync.db'='kylinstreamdb',
   'hive_sync.support_timestamp'='true',
   'hive_sync_metastore.uris'='thrift://xxxxx:xxxx,thrift://xxxxxx:xxxx',
   'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
   'hive_sync.use_kerberos'='true',
   
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
   );
   
   CREATE TABLE hbase_mb6_page_bus(rowkey string,
    info row<
    bus                   string
   ,bdha_hd_tx_time          string
   ,hd_tx_date               string
    >,
    PRIMARY KEY(rowkey) NOT ENFORCED
   ) with ('connector'='hbase-2.2',
   'table-name'='mb6_page_bus',
   'zookeeper.quorum'='xxxx:24002,xxxx:24002,xxxx:24002',
   'properties.hbase.security.authentication'='kerberos', 
   
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
   
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
   );
   
   CREATE TABLE hbase_mb_videomanage(rowkey string,
    info row<
    bvm_videoname            string
   ,bvm_videourl             string
   ,bvm_videolength          string
   ,bvm_starttime            string
   ,bvm_endtime              string
   ,bvm_videotitle           string
   ,bvm_video                string
   ,bvm_imgurl               string
   ,bvm_channel              string
   ,bvm_watchnum             bigint
   ,bvm_messageurl           string
   ,bvm_belongcolumn         string
   ,bvm_keyword              string
   ,bvm_isallowinteract      string
   ,bvm_isallowbulletscreem  string
   ,bvm_isallowshare         string
   ,bvm_watchcrowd           string
   ,bvm_out                  string
   ,bvm_recommend            string
   ,bvm_sharetitle           string
   ,bvm_sharecontent         string
   ,bvm_correlationvideo     string
   ,bvm_comperenumber        string
   ,bvm_recommendorder       bigint
   ,bvm_video_label          bigint
   ,bvm_org_code             string
   ,bvm_channle_id           string
   ,bvm_hot_flag             string
   ,bvm_operator_id          string
   ,bvm_video_flag           string
   ,bvm_recommend_time       string
   ,bvm_imgurl6              string
   ,bvm_configtype           string
   ,bvm_modify_backimage     string
   ,bvm_backimage            string
   ,bvm_isposition           string
   ,bvm_videourl_new         string
   ,bvm_messageurl_new       string
   ,bdha_hd_tx_time          string
   ,hd_tx_date               string
    >,
    PRIMARY KEY(rowkey) NOT ENFORCED
   ) with ('connector'='hbase-2.2',
   'table-name'='mb_videomanage',
   'zookeeper.quorum'='xxxxx:24002,xxxxx:24002,xxxxx:24002',
   'properties.hbase.security.authentication'='kerberos', 
   
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
   
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
   );
   
   create table streaming_rcsp_label(
   id string  PRIMARY KEY NOT ENFORCED,
   shareId string ,
   sharerUserId string,
   sharerEcifId string,
   sharedUserId string,
   sharedEcifId string,
   bindTime string,
   bindStatus string,
   finishTime string,
    procTime TIMESTAMP,
   hd_tx_date_h string,
   hd_tx_date string,
   bvm_org_code string,
   bus string
   )
   with (
    'connector' = 'hudi',
    'path' = 
'hdfs://hacluster/user/hive/warehouse/kylinstreamdb.db/streaming_rcsp_label',
   'hoodie.datasource.write.recordkey.field' = 'uuid',
   'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
    'hoodie.parquet.max.file.size' = '268435456',
    'write.tasks' = '4',
    'write.bucket_assign.tasks'='1',
    'write.task.max.size'='1024',
    'write.rate.limit'='30000',
    --'hoodie.datasource.write.precombine.field'='ts',
    'table.type'='MERGE_ON_READ',
    'changelog.enable'='true',
     'write.operation'='INSERT',
    'compaction.tasks'='1',
    'compaction.delta_commits'='5',
    'compaction.max_memory'='500',
    'hive_sync.enable'='true',
    'hive_sync.mode'='hms',
    'hive_sync.table'='streaming_rcsp_label',
    'hive_sync.db'='kylinstreamdb',
    'hive_sync.support_timestamp'='true',
   'hive_sync_metastore.uris'='thrift://xxxxx:21088,thrift://xxxxx:21088',
   'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
   'hive_sync.use_kerberos'='true',
   
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
   
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
    );
   
   insert into streaming_rcsp_label(
    id                    
   ,shareId
   ,sharerUserId
   ,sharerEcifId
   ,sharedUserId
   ,sharedEcifId
   ,bindTime
   ,bindStatus
   ,finishTime
   ,procTime
   ,hd_tx_date_h
   ,hd_tx_date
   ,bvm_org_code
   ,bus
   ,sysdt
   )
   select 
   t1.id,
   t1.shareId ,
   t1.sharerUserId ,
   t1.sharerEcifId ,
   t1.sharedUserId ,
   t1.sharedEcifId ,
   t1.bindTime ,
   t1.bindStatus,
   t1.finishTime,
   t1.procTime,
   t2.bvm_org_code,
   t3.bus,
   cast(NOW() as string) as sysdt, 
   concat(substr(cast(NOW() as string),1, 13), ':00:00') as hd_tx_date_h,
   substr(cast(NOW() as string),1, 10) as hd_tx_date
   from streaming1_cabs_task_mgm_m2 t1
   left join  hbase_mb_videomanage for system_time as of t1.procTime as t2
   on t1.videoCode=t2.rowkey
   left join hbase_mb6_page_bus for system_time as of t1.procTime as t3
   on t1.mbk_usdf_evnt_id=t3.rowkey where hd_tx_date >= '2023-12-11' and 
hd_tx_date <= '2023-12-12';
   
   
   An error occurred when task3 was executed.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to