fanfanAlice commented on issue #10700:
URL: https://github.com/apache/hudi/issues/10700#issuecomment-1953436012
flink sql task1:
read kafka topic data join hbase table insert to hudi table table1.
kafka topic only keeps data for three hours.
There are also hundreds of millions of kafka data and hbase data every day,
and often tasks fail due to resource issues.
Because the banking system task fails and cannot be restarted immediately,
the task must be restarted during the production window.
By the time the window production period is reached, kafka topic data may
have been lost.
So there's another flink task that backs up kafka topic's data in real time.
this is flink sql task2.
When task1 fails, read the task2 hudi table data join hbase table to be
reinserted to the hudi table written by task1. this is flink sql task3
CREATE TABLE hbase_mb6_page_bus(rowkey string,
info row<
bus string
,bdha_hd_tx_time string
,hd_tx_date string
>,
PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb6_page_bus',
'zookeeper.quorum'='xxxx:24002,xxxx:24002,xxxx:24002',
'properties.hbase.security.authentication'='kerberos',
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);
CREATE TABLE hbase_mb_videomanage(rowkey string,
info row<
bvm_videoname string
,bvm_videourl string
,bvm_videolength string
,bvm_starttime string
,bvm_endtime string
,bvm_videotitle string
,bvm_video string
,bvm_imgurl string
,bvm_channel string
,bvm_watchnum bigint
,bvm_messageurl string
,bvm_belongcolumn string
,bvm_keyword string
,bvm_isallowinteract string
,bvm_isallowbulletscreem string
,bvm_isallowshare string
,bvm_watchcrowd string
,bvm_out string
,bvm_recommend string
,bvm_sharetitle string
,bvm_sharecontent string
,bvm_correlationvideo string
,bvm_comperenumber string
,bvm_recommendorder bigint
,bvm_video_label bigint
,bvm_org_code string
,bvm_channle_id string
,bvm_hot_flag string
,bvm_operator_id string
,bvm_video_flag string
,bvm_recommend_time string
,bvm_imgurl6 string
,bvm_configtype string
,bvm_modify_backimage string
,bvm_backimage string
,bvm_isposition string
,bvm_videourl_new string
,bvm_messageurl_new string
,bdha_hd_tx_time string
,hd_tx_date string
>,
PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb_videomanage',
'zookeeper.quorum'='xxxxx:24002,xxxxx:24002,xxxxx:24002',
'properties.hbase.security.authentication'='kerberos',
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);
create table cabs_task_mgm_m2 (
id string,
shareId string,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
procTime as PROCTIME()
) WITH (
'connector' = 'kafka-token',
'topic' = 'CABS-TASK-MGM-M2',
'properties.topic' = 'CABS-TASK-MGM-M2',
'properties.rdeg.console' = 'rdeg-chn-slb-pj-1.bocomm.com:8089',
'properties.rdeg.location' = 'zj',
'properties.rdeg.token' =
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzeXN0ZW1OYW1lIjoiREJCSSIsInJ1bkVudiI6InByZCIsIm5vd1RpbWUiOjE2ODQ5Nzk3MDQ3NTh9.De5kUMfRtCRgNPa3SsjJoHpufTQ9uHSN0c9y2DCekK8',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'latest-offset', --latest-offset/earliest-offset
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '1000',
'properties.zookeeper.sasl.client' = 'false',
'properties.zookeeper.sasl.clientconfig' = 'false'
);
--select * from kafka_rcsp_label_table;
create table streaming_rcsp_label(
id string PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string,
bvm_org_code string,
bus string
)
with (
'connector' = 'hudi',
'path' =
'hdfs://hacluster/user/hive/warehouse/kylinstreamdb.db/streaming_rcsp_label',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
'hoodie.parquet.max.file.size' = '268435456',
'write.tasks' = '4',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
--'hoodie.datasource.write.precombine.field'='ts',
'table.type'='MERGE_ON_READ',
'changelog.enable'='true',
'write.operation'='INSERT',
'compaction.tasks'='1',
'compaction.delta_commits'='5',
'compaction.max_memory'='500',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.table'='streaming_rcsp_label',
'hive_sync.db'='kylinstreamdb',
'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:21088,thrift://xxxxx:21088',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
);
insert into streaming_rcsp_label(
id
,shareId
,sharerUserId
,sharerEcifId
,sharedUserId
,sharedEcifId
,bindTime
,bindStatus
,finishTime
,procTime
,hd_tx_date_h
,hd_tx_date
,bvm_org_code
,bus
,sysdt
)
select
t1.id,
t1.shareId ,
t1.sharerUserId ,
t1.sharerEcifId ,
t1.sharedUserId ,
t1.sharedEcifId ,
t1.bindTime ,
t1.bindStatus,
t1.finishTime,
t1.procTime,
t2.bvm_org_code,
t3.bus,
cast(NOW() as string) as sysdt,
concat(substr(cast(NOW() as string),1, 13), ':00:00') as hd_tx_date_h,
substr(cast(NOW() as string),1, 10) as hd_tx_date
from kafka_rcsp_label_table t1
left join hbase_mb_videomanage for system_time as of t1.procTime as t2
on t1.videoCode=t2.rowkey
left join hbase_mb6_page_bus for system_time as of t1.procTime as t3
on t1.mbk_usdf_evnt_id=t3.rowkey;
flink sql task2: backup kafka topic to hudi
create table cabs_task_mgm_m2 (
id string,
shareId string,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
procTime as PROCTIME()
) WITH (
'connector' = 'kafka-token',
'topic' = 'CABS-TASK-MGM-M2',
'properties.topic' = 'CABS-TASK-MGM-M2',
'properties.rdeg.console' = 'rdeg-chn-slb-pj-1.bocomm.com:8089',
'properties.rdeg.location' = 'zj',
'properties.rdeg.token' =
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzeXN0ZW1OYW1lIjoiREJCSSIsInJ1bkVudiI6InByZCIsIm5vd1RpbWUiOjE2ODQ5Nzk3MDQ3NTh9.De5kUMfRtCRgNPa3SsjJoHpufTQ9uHSN0c9y2DCekK8',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'latest-offset', --latest-offset/earliest-offset
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '1000',
'properties.zookeeper.sasl.client' = 'false',
'properties.zookeeper.sasl.clientconfig' = 'false'
);
-- The kafka connector authentication method has been modified
create table streaming1_cabs_task_mgm_m2(
id string PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string
) with (
'connector' = 'hudi',
'path' =
'hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
'hoodie.parquet.max.file.size' = '268435456',
'write.tasks' = '4',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
'hoodie.datasource.write.precombine.field'='ts',
'table.type'='MERGE_ON_READ',
--'index.type'='BUCKET',
'write.operation'='UPSERT',
'changelog.enable'='true',
'compaction.tasks'='1',
'compaction.async.enable'='false',
'compaction.delta_commits'='5',
'compaction.max_memory'='500',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.table'='streaming1_cabs_task_mgm_m2',
'hive_sync.db'='kylinstreamdb',
'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:xxxx,thrift://xxxxxx:xxxx',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
);
insert into streaming1_cabs_task_mgm_m2(
id,
shareId ,
sharerUserId ,
sharerEcifId ,
sharedUserId ,
sharedEcifId ,
bindTime ,
bindStatus ,
finishTime ,
procTime ,
hd_tx_date_h,
hd_tx_date
)
select
id ,
shareId ,
sharerUserId ,
sharerEcifId ,
sharedUserId ,
sharedEcifId ,
bindTime ,
bindStatus ,
finishTime ,
procTime ,
concat(substr(cast(procTime as string),1, 13), ':00:00') as hd_tx_date_h,
substr(cast(procTime as string),1, 10) hd_tx_date
from cabs_task_mgm_m2 t1 where id is not null;
flink sql task3: When task1 fails, read the task2 hudi table data join
hbase table to be reinserted to the hudi table written by task1
create table streaming1_cabs_task_mgm_m2(
id string PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string
) with (
'connector' = 'hudi',
'path' =
'hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
'hoodie.parquet.max.file.size' = '268435456',
'write.tasks' = '4',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
'hoodie.datasource.write.precombine.field'='ts',
'table.type'='MERGE_ON_READ',
--'index.type'='BUCKET',
'write.operation'='UPSERT',
'changelog.enable'='true',
'compaction.tasks'='1',
'compaction.async.enable'='false',
'compaction.delta_commits'='5',
'compaction.max_memory'='500',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.table'='streaming1_cabs_task_mgm_m2',
'hive_sync.db'='kylinstreamdb',
'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:xxxx,thrift://xxxxxx:xxxx',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
);
CREATE TABLE hbase_mb6_page_bus(rowkey string,
info row<
bus string
,bdha_hd_tx_time string
,hd_tx_date string
>,
PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb6_page_bus',
'zookeeper.quorum'='xxxx:24002,xxxx:24002,xxxx:24002',
'properties.hbase.security.authentication'='kerberos',
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);
CREATE TABLE hbase_mb_videomanage(rowkey string,
info row<
bvm_videoname string
,bvm_videourl string
,bvm_videolength string
,bvm_starttime string
,bvm_endtime string
,bvm_videotitle string
,bvm_video string
,bvm_imgurl string
,bvm_channel string
,bvm_watchnum bigint
,bvm_messageurl string
,bvm_belongcolumn string
,bvm_keyword string
,bvm_isallowinteract string
,bvm_isallowbulletscreem string
,bvm_isallowshare string
,bvm_watchcrowd string
,bvm_out string
,bvm_recommend string
,bvm_sharetitle string
,bvm_sharecontent string
,bvm_correlationvideo string
,bvm_comperenumber string
,bvm_recommendorder bigint
,bvm_video_label bigint
,bvm_org_code string
,bvm_channle_id string
,bvm_hot_flag string
,bvm_operator_id string
,bvm_video_flag string
,bvm_recommend_time string
,bvm_imgurl6 string
,bvm_configtype string
,bvm_modify_backimage string
,bvm_backimage string
,bvm_isposition string
,bvm_videourl_new string
,bvm_messageurl_new string
,bdha_hd_tx_time string
,hd_tx_date string
>,
PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb_videomanage',
'zookeeper.quorum'='xxxxx:24002,xxxxx:24002,xxxxx:24002',
'properties.hbase.security.authentication'='kerberos',
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);
create table streaming_rcsp_label(
id string PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string,
bvm_org_code string,
bus string
)
with (
'connector' = 'hudi',
'path' =
'hdfs://hacluster/user/hive/warehouse/kylinstreamdb.db/streaming_rcsp_label',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
'hoodie.parquet.max.file.size' = '268435456',
'write.tasks' = '4',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
--'hoodie.datasource.write.precombine.field'='ts',
'table.type'='MERGE_ON_READ',
'changelog.enable'='true',
'write.operation'='INSERT',
'compaction.tasks'='1',
'compaction.delta_commits'='5',
'compaction.max_memory'='500',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.table'='streaming_rcsp_label',
'hive_sync.db'='kylinstreamdb',
'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:21088,thrift://xxxxx:21088',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom....@hadoop.mrs.xx.write.bocom.com',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
);
insert into streaming_rcsp_label(
id
,shareId
,sharerUserId
,sharerEcifId
,sharedUserId
,sharedEcifId
,bindTime
,bindStatus
,finishTime
,procTime
,hd_tx_date_h
,hd_tx_date
,bvm_org_code
,bus
,sysdt
)
select
t1.id,
t1.shareId ,
t1.sharerUserId ,
t1.sharerEcifId ,
t1.sharedUserId ,
t1.sharedEcifId ,
t1.bindTime ,
t1.bindStatus,
t1.finishTime,
t1.procTime,
t2.bvm_org_code,
t3.bus,
cast(NOW() as string) as sysdt,
concat(substr(cast(NOW() as string),1, 13), ':00:00') as hd_tx_date_h,
substr(cast(NOW() as string),1, 10) as hd_tx_date
from streaming1_cabs_task_mgm_m2 t1
left join hbase_mb_videomanage for system_time as of t1.procTime as t2
on t1.videoCode=t2.rowkey
left join hbase_mb6_page_bus for system_time as of t1.procTime as t3
on t1.mbk_usdf_evnt_id=t3.rowkey where hd_tx_date >= '2023-12-11' and
hd_tx_date <= '2023-12-12';
An error occurred when task3 was executed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]