tieke1121 opened a new issue #4223:
URL: https://github.com/apache/hudi/issues/4223


   HI ,
    I am using Flink to consume data from Kafka  and  use Side Outputs # 
feature to collect different business data .  then sink to different hudi table 
sink . after i submit job on yarn ,then generate multiple flink jobs
   
   Steps to reproduce the behavior:
   
   1. Flink to consume data from Kafka
   2. use Flink Side Outputs to collect different business data,and convert 
data to Row
   3.each different type of data is written into a different hudi table
   4.generate multiple flink Job on yarn
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.9.0
   
   * Hive version :2.1.1
   
   * Hadoop version :3.0.0
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :NO
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   1 submit yarn log :
   [root@tslaver2 flink-1.12.2]# bin/flink run -c 
com.hly.bigdata.ingestion.FlinkStreamer -m yarn-cluster -d -yjm 1024 -ytm 4096 
-p 4 -ys 3 -ynm accesscard ../ingestion/streaming-1.0.jar --kafka-topic 
hly_accesscard --kafka-group-id RRR  --kafka-bootstrap-servers 172.20.2.62:9092 
 --flink-checkpoint-path hdfs://tslaver5:8020/data/flink/checkpoint/ingestion 
--source-ingestion-path ../ingestion/ingestion.json --source-ingestion-sql-path 
../ingestion/ingestion.sql
   Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
   2021-12-06 11:10:42,414 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli  
              [] - Found Yarn properties file under /tmp/.yarn-properties-root.
   2021-12-06 11:10:42,414 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli  
              [] - Found Yarn properties file under /tmp/.yarn-properties-root.
   2021-12-06 11:10:47,739 WARN  
org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The 
configuration directory ('/opt/flink-1.12.2/conf') already contains a LOG4J 
config file.If you want to use logback, then please delete or rename the log 
configuration file.
   2021-12-06 11:10:47,797 INFO  org.apache.hadoop.yarn.client.RMProxy          
              [] - Connecting to ResourceManager at tslaver4/10.0.2.61:8032
   2021-12-06 11:10:48,000 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - No path for the flink jar passed. Using the location of 
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
   2021-12-06 11:10:48,227 INFO  org.apache.hadoop.conf.Configuration           
              [] - resource-types.xml not found
   2021-12-06 11:10:48,228 INFO  
org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to 
find 'resource-types.xml'.
   2021-12-06 11:10:48,339 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, 
slotsPerTaskManager=3}
   2021-12-06 11:10:59,098 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Submitting application master application_1638168337345_0101
   2021-12-06 11:10:59,200 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted 
application application_1638168337345_0101
   2021-12-06 11:10:59,200 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Waiting for the cluster to be allocated
   2021-12-06 11:10:59,204 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Deploying cluster, current state ACCEPTED
   2021-12-06 11:11:23,553 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - YARN application has been deployed successfully.
   2021-12-06 11:11:23,555 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - The Flink YARN session cluster has been started in detached 
mode. In order to stop Flink gracefully, use the following command:
   $ echo "stop" | ./bin/yarn-session.sh -id application_1638168337345_0101
   If this should not be possible, then you can also kill Flink via YARN's web 
interface or via:
   $ yarn application -kill application_1638168337345_0101
   Note that killing Flink might not clean up all job artifacts and temporary 
files.
   2021-12-06 11:11:23,555 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Found Web Interface tmaster:34273 of application 
'application_1638168337345_0101'.
   Job has been submitted with JobID dc075888925c46329e84b40d6e2109d0
   2021-12-06 11:11:23,796 INFO  org.apache.hadoop.yarn.client.RMProxy          
              [] - Connecting to ResourceManager at tslaver4/10.0.2.61:8032
   2021-12-06 11:11:23,797 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - No path for the flink jar passed. Using the location of 
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
   2021-12-06 11:11:23,820 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, 
slotsPerTaskManager=3}
   2021-12-06 11:11:36,763 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Submitting application master application_1638168337345_0102
   2021-12-06 11:11:37,018 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted 
application application_1638168337345_0102
   2021-12-06 11:11:37,018 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Waiting for the cluster to be allocated
   2021-12-06 11:11:37,050 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Deploying cluster, current state ACCEPTED
   2021-12-06 11:12:00,198 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - YARN application has been deployed successfully.
   2021-12-06 11:12:00,199 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - The Flink YARN session cluster has been started in detached 
mode. In order to stop Flink gracefully, use the following command:
   $ echo "stop" | ./bin/yarn-session.sh -id application_1638168337345_0102
   If this should not be possible, then you can also kill Flink via YARN's web 
interface or via
   
   2 flink sql 
   2.1   for business data 1
   SELECT id AS uuid, id_old, id_base, account_id, dynamic_date
           , comsume_no, dynamic_sum, beforedynamic_sum, afterdynamic_sum, 
dynamicacc_id
           , dynamicacc_name, borrow_account_type, borrow_accountid, 
borrow_accountname, card_id
           , vehicle_plate, borrow_openaccountbankno, 
borrow_openaccountbankname, borrow_organizationno, loan_account_type
           , loan_accountid, loan_accountname, loan_openaccountbankno, 
loan_openaccountbankname, loan_organizationno
           , billing_id, charge_channelid, charge_channel, charge_pathid, 
charge_path
           , platform_resultcode, platform_resultmsg, charge_result, 
deal_userid, deal_username
           , charge_desc, consume_time, create_time, update_time, 
check_time,create_time AS ts,DATE_FORMAT(create_time, 'yyyy-MM-dd') AS pt
   FROM accesscard_dynamicaccountinfo_temp;
   
   CREATE TABLE `accesscard_dynamicaccountinfo`(
       `uuid` bigint,
       `id_old` varchar(50),
       `id_base` bigint,
       `account_id` bigint,
       `dynamic_date` TIMESTAMP(3),
       `comsume_no` varchar(100),
       `dynamic_sum` decimal(16,4),
       `beforedynamic_sum` decimal(16,4),
       `afterdynamic_sum` decimal(16,4),
       `dynamicacc_id` varchar(50),
       `dynamicacc_name` varchar(50),
       `borrow_account_type` int,
       `borrow_accountid` varchar(32),
       `borrow_accountname` varchar(50),
       `card_id` bigint,
       `vehicle_plate` varchar(20),
       `borrow_openaccountbankno` varchar(50),
       `borrow_openaccountbankname` varchar(50),
       `borrow_organizationno` varchar(50),
       `loan_account_type` int,
       `loan_accountid` varchar(50),
       `loan_accountname` varchar(50),
       `loan_openaccountbankno` varchar(50),
       `loan_openaccountbankname` varchar(50),
       `loan_organizationno` varchar(50),
       `billing_id` varchar(500),
       `charge_channelid` varchar(50),
       `charge_channel` varchar(50),
       `charge_pathid` varchar(50),
       `charge_path` varchar(50),
       `platform_resultcode` varchar(10),
       `platform_resultmsg` varchar(100),
       `charge_result` varchar(50),
       `deal_userid` bigint,
       `deal_username` varchar(50),
       `charge_desc` varchar(100),
       `consume_time` TIMESTAMP(3),
       `create_time` TIMESTAMP(3),
       `update_time` TIMESTAMP(3),
       `check_time` TIMESTAMP(3),
       `ts` TIMESTAMP(3),
       `pt` varchar(20)
   )
   PARTITIONED BY (`pt`)
   WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://tslaver5:8020/data/hudi/accesscard_dynamicaccountinfo',
   'table.type' = 'MERGE_ON_READ',
   'hive_sync.enable' = 'true',
   'hive_sync.mode' = 'hms' ,
   'hive_sync.metastore.uris' = 'thrift://tmaster:9083',
   'hive_sync.jdbc_url'='jdbc:hive2://tslaver5:10000',
   'hive_sync.table'='accesscard_dynamicaccountinfo',
   'hive_sync.db'='ods',
   'hive_sync.username'='',
   'hive_sync.password'='',
   'read.tasks'='3',
   'write.bucket_assign.tasks'='3',
   'write.tasks'='3',
   'write.index_bootstrap.tasks'='2',
   'compaction.tasks'='2'
   );
   
   2.2 
   2.1   for business data 2
   
   SELECT id AS uuid, id_old, id_base, etc_channel, status
           , account_name, account_pwd, is_message_tip, owner_type, account_type
           , relation_name, linkman_gender, birthday, certifi_type, certifi_lic
           , phone_no, address, post_code, email, mobile
           , fax_no, transactor, trans_cer_type, trans_cer_no, transcer_phone_no
           , transcer_mobile, creater, creater_old, balance, dot_id
           , dot_id_old, dot_name, user_id, user_id_old, user_name
           , parent_id, parent_id_old, dept_id, dept_name, real_name
           , referee, org_id, org_id_old, org_name, trace_no
           , refund_status, is_white_list, delay_doblack_def, verifystate, 
protocolstate
           , create_time, update_time, invoice_mobile, organization_name, 
version,create_time AS ts,DATE_FORMAT(create_time, 'yyyy-MM-dd') AS pt
   FROM accesscard_entityaccount_temp;
   
   CREATE TABLE `accesscard_entityaccount` (
     `uuid` bigint,
     `id_old` varchar(32),
     `id_base` bigint,
     `etc_channel` varchar(10),
     `status` int,
     `account_name` varchar(300),
     `account_pwd` varchar(255),
     `is_message_tip` int,
     `owner_type` int,
     `account_type` int,
     `relation_name` varchar(255),
     `linkman_gender` int,
     `birthday` varchar(255),
     `certifi_type` varchar(50),
     `certifi_lic` varchar(50),
     `phone_no` varchar(20),
     `address` varchar(255),
     `post_code` varchar(10),
     `email` varchar(100),
     `mobile` varchar(20),
     `fax_no` varchar(20),
     `transactor` varchar(20),
     `trans_cer_type` int,
     `trans_cer_no` varchar(50),
     `transcer_phone_no` varchar(20),
     `transcer_mobile` varchar(20),
     `creater` bigint,
     `creater_old` varchar(32),
     `balance` decimal(16,4),
     `dot_id` varchar(255),
     `dot_id_old` varchar(255),
     `dot_name` varchar(200),
     `user_id` bigint,
     `user_id_old` varchar(32),
     `user_name` varchar(50),
     `parent_id` bigint,
     `parent_id_old` varchar(32),
     `dept_id` bigint,
     `dept_name` varchar(50),
     `real_name` varchar(50),
     `referee` varchar(50),
     `org_id` bigint,
     `org_id_old` varchar(32),
     `org_name` varchar(100),
     `trace_no` varchar(50),
     `refund_status` int,
     `is_white_list` varchar(2),
     `delay_doblack_def` int,
     `verifystate` varchar(50),
     `protocolstate` varchar(50),
     `create_time` TIMESTAMP(3),
     `update_time` TIMESTAMP(3),
     `invoice_mobile` varchar(20),
     `organization_name` varchar(100),
     `version` bigint,
     `ts` TIMESTAMP(3),
     `pt` varchar(20)
   )
   PARTITIONED BY (`pt`)
   WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://tslaver5:8020/data/hudi/accesscard_entityaccount',
   'table.type' = 'MERGE_ON_READ',
   'hive_sync.enable' = 'true',
   'hive_sync.mode' = 'hms' ,
   'hive_sync.metastore.uris' = 'thrift://tmaster:9083',
   'hive_sync.jdbc_url'='jdbc:hive2://tslaver5:10000',
   'hive_sync.table'='accesscard_entityaccount',
   'hive_sync.db'='ods',
   'hive_sync.username'='',
   'hive_sync.password'='',
   'read.tasks'='3',
   'write.bucket_assign.tasks'='3',
   'write.tasks'='3',
   'write.index_bootstrap.tasks'='2',
   'compaction.tasks'='2'
   );
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to