tieke1121 opened a new issue #4223:
URL: https://github.com/apache/hudi/issues/4223
HI ,
I am using Flink to consume data from Kafka and use Side Outputs #
feature to collect different business data . then sink to different hudi table
sink . after i submit job on yarn ,then generate multiple flink jobs
Steps to reproduce the behavior:
1. Flink to consume data from Kafka
2. use Flink Side Outputs to collect different business data,and convert
data to Row
3.each different type of data is written into a different hudi table
4.generate multiple flink Job on yarn
**Expected behavior**
A clear and concise description of what you expected to happen.
**Environment Description**
* Hudi version :0.9.0
* Hive version :2.1.1
* Hadoop version :3.0.0
* Storage (HDFS/S3/GCS..) :HDFS
* Running on Docker? (yes/no) :NO
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
1 submit yarn log :
[root@tslaver2 flink-1.12.2]# bin/flink run -c
com.hly.bigdata.ingestion.FlinkStreamer -m yarn-cluster -d -yjm 1024 -ytm 4096
-p 4 -ys 3 -ynm accesscard ../ingestion/streaming-1.0.jar --kafka-topic
hly_accesscard --kafka-group-id RRR --kafka-bootstrap-servers 172.20.2.62:9092
--flink-checkpoint-path hdfs://tslaver5:8020/data/flink/checkpoint/ingestion
--source-ingestion-path ../ingestion/ingestion.json --source-ingestion-sql-path
../ingestion/ingestion.sql
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
2021-12-06 11:10:42,414 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
[] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-12-06 11:10:42,414 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
[] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-12-06 11:10:47,739 WARN
org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The
configuration directory ('/opt/flink-1.12.2/conf') already contains a LOG4J
config file.If you want to use logback, then please delete or rename the log
configuration file.
2021-12-06 11:10:47,797 INFO org.apache.hadoop.yarn.client.RMProxy
[] - Connecting to ResourceManager at tslaver4/10.0.2.61:8032
2021-12-06 11:10:48,000 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - No path for the flink jar passed. Using the location of
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-12-06 11:10:48,227 INFO org.apache.hadoop.conf.Configuration
[] - resource-types.xml not found
2021-12-06 11:10:48,228 INFO
org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to
find 'resource-types.xml'.
2021-12-06 11:10:48,339 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Cluster specification:
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096,
slotsPerTaskManager=3}
2021-12-06 11:10:59,098 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Submitting application master application_1638168337345_0101
2021-12-06 11:10:59,200 INFO
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted
application application_1638168337345_0101
2021-12-06 11:10:59,200 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Waiting for the cluster to be allocated
2021-12-06 11:10:59,204 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Deploying cluster, current state ACCEPTED
2021-12-06 11:11:23,553 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - YARN application has been deployed successfully.
2021-12-06 11:11:23,555 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - The Flink YARN session cluster has been started in detached
mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1638168337345_0101
If this should not be possible, then you can also kill Flink via YARN's web
interface or via:
$ yarn application -kill application_1638168337345_0101
Note that killing Flink might not clean up all job artifacts and temporary
files.
2021-12-06 11:11:23,555 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Found Web Interface tmaster:34273 of application
'application_1638168337345_0101'.
Job has been submitted with JobID dc075888925c46329e84b40d6e2109d0
2021-12-06 11:11:23,796 INFO org.apache.hadoop.yarn.client.RMProxy
[] - Connecting to ResourceManager at tslaver4/10.0.2.61:8032
2021-12-06 11:11:23,797 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - No path for the flink jar passed. Using the location of
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-12-06 11:11:23,820 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Cluster specification:
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096,
slotsPerTaskManager=3}
2021-12-06 11:11:36,763 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Submitting application master application_1638168337345_0102
2021-12-06 11:11:37,018 INFO
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted
application application_1638168337345_0102
2021-12-06 11:11:37,018 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Waiting for the cluster to be allocated
2021-12-06 11:11:37,050 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Deploying cluster, current state ACCEPTED
2021-12-06 11:12:00,198 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - YARN application has been deployed successfully.
2021-12-06 11:12:00,199 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - The Flink YARN session cluster has been started in detached
mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1638168337345_0102
If this should not be possible, then you can also kill Flink via YARN's web
interface or via
2 flink sql
2.1 for business data 1
SELECT id AS uuid, id_old, id_base, account_id, dynamic_date
, comsume_no, dynamic_sum, beforedynamic_sum, afterdynamic_sum,
dynamicacc_id
, dynamicacc_name, borrow_account_type, borrow_accountid,
borrow_accountname, card_id
, vehicle_plate, borrow_openaccountbankno,
borrow_openaccountbankname, borrow_organizationno, loan_account_type
, loan_accountid, loan_accountname, loan_openaccountbankno,
loan_openaccountbankname, loan_organizationno
, billing_id, charge_channelid, charge_channel, charge_pathid,
charge_path
, platform_resultcode, platform_resultmsg, charge_result,
deal_userid, deal_username
, charge_desc, consume_time, create_time, update_time,
check_time,create_time AS ts,DATE_FORMAT(create_time, 'yyyy-MM-dd') AS pt
FROM accesscard_dynamicaccountinfo_temp;
CREATE TABLE `accesscard_dynamicaccountinfo`(
`uuid` bigint,
`id_old` varchar(50),
`id_base` bigint,
`account_id` bigint,
`dynamic_date` TIMESTAMP(3),
`comsume_no` varchar(100),
`dynamic_sum` decimal(16,4),
`beforedynamic_sum` decimal(16,4),
`afterdynamic_sum` decimal(16,4),
`dynamicacc_id` varchar(50),
`dynamicacc_name` varchar(50),
`borrow_account_type` int,
`borrow_accountid` varchar(32),
`borrow_accountname` varchar(50),
`card_id` bigint,
`vehicle_plate` varchar(20),
`borrow_openaccountbankno` varchar(50),
`borrow_openaccountbankname` varchar(50),
`borrow_organizationno` varchar(50),
`loan_account_type` int,
`loan_accountid` varchar(50),
`loan_accountname` varchar(50),
`loan_openaccountbankno` varchar(50),
`loan_openaccountbankname` varchar(50),
`loan_organizationno` varchar(50),
`billing_id` varchar(500),
`charge_channelid` varchar(50),
`charge_channel` varchar(50),
`charge_pathid` varchar(50),
`charge_path` varchar(50),
`platform_resultcode` varchar(10),
`platform_resultmsg` varchar(100),
`charge_result` varchar(50),
`deal_userid` bigint,
`deal_username` varchar(50),
`charge_desc` varchar(100),
`consume_time` TIMESTAMP(3),
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`check_time` TIMESTAMP(3),
`ts` TIMESTAMP(3),
`pt` varchar(20)
)
PARTITIONED BY (`pt`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://tslaver5:8020/data/hudi/accesscard_dynamicaccountinfo',
'table.type' = 'MERGE_ON_READ',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms' ,
'hive_sync.metastore.uris' = 'thrift://tmaster:9083',
'hive_sync.jdbc_url'='jdbc:hive2://tslaver5:10000',
'hive_sync.table'='accesscard_dynamicaccountinfo',
'hive_sync.db'='ods',
'hive_sync.username'='',
'hive_sync.password'='',
'read.tasks'='3',
'write.bucket_assign.tasks'='3',
'write.tasks'='3',
'write.index_bootstrap.tasks'='2',
'compaction.tasks'='2'
);
2.2
2.1 for business data 2
SELECT id AS uuid, id_old, id_base, etc_channel, status
, account_name, account_pwd, is_message_tip, owner_type, account_type
, relation_name, linkman_gender, birthday, certifi_type, certifi_lic
, phone_no, address, post_code, email, mobile
, fax_no, transactor, trans_cer_type, trans_cer_no, transcer_phone_no
, transcer_mobile, creater, creater_old, balance, dot_id
, dot_id_old, dot_name, user_id, user_id_old, user_name
, parent_id, parent_id_old, dept_id, dept_name, real_name
, referee, org_id, org_id_old, org_name, trace_no
, refund_status, is_white_list, delay_doblack_def, verifystate,
protocolstate
, create_time, update_time, invoice_mobile, organization_name,
version,create_time AS ts,DATE_FORMAT(create_time, 'yyyy-MM-dd') AS pt
FROM accesscard_entityaccount_temp;
CREATE TABLE `accesscard_entityaccount` (
`uuid` bigint,
`id_old` varchar(32),
`id_base` bigint,
`etc_channel` varchar(10),
`status` int,
`account_name` varchar(300),
`account_pwd` varchar(255),
`is_message_tip` int,
`owner_type` int,
`account_type` int,
`relation_name` varchar(255),
`linkman_gender` int,
`birthday` varchar(255),
`certifi_type` varchar(50),
`certifi_lic` varchar(50),
`phone_no` varchar(20),
`address` varchar(255),
`post_code` varchar(10),
`email` varchar(100),
`mobile` varchar(20),
`fax_no` varchar(20),
`transactor` varchar(20),
`trans_cer_type` int,
`trans_cer_no` varchar(50),
`transcer_phone_no` varchar(20),
`transcer_mobile` varchar(20),
`creater` bigint,
`creater_old` varchar(32),
`balance` decimal(16,4),
`dot_id` varchar(255),
`dot_id_old` varchar(255),
`dot_name` varchar(200),
`user_id` bigint,
`user_id_old` varchar(32),
`user_name` varchar(50),
`parent_id` bigint,
`parent_id_old` varchar(32),
`dept_id` bigint,
`dept_name` varchar(50),
`real_name` varchar(50),
`referee` varchar(50),
`org_id` bigint,
`org_id_old` varchar(32),
`org_name` varchar(100),
`trace_no` varchar(50),
`refund_status` int,
`is_white_list` varchar(2),
`delay_doblack_def` int,
`verifystate` varchar(50),
`protocolstate` varchar(50),
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`invoice_mobile` varchar(20),
`organization_name` varchar(100),
`version` bigint,
`ts` TIMESTAMP(3),
`pt` varchar(20)
)
PARTITIONED BY (`pt`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://tslaver5:8020/data/hudi/accesscard_entityaccount',
'table.type' = 'MERGE_ON_READ',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms' ,
'hive_sync.metastore.uris' = 'thrift://tmaster:9083',
'hive_sync.jdbc_url'='jdbc:hive2://tslaver5:10000',
'hive_sync.table'='accesscard_entityaccount',
'hive_sync.db'='ods',
'hive_sync.username'='',
'hive_sync.password'='',
'read.tasks'='3',
'write.bucket_assign.tasks'='3',
'write.tasks'='3',
'write.index_bootstrap.tasks'='2',
'compaction.tasks'='2'
);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]