longjuncai1994 opened a new issue, #7953:
URL: https://github.com/apache/hudi/issues/7953

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?  yes
   
   **Describe the problem you faced**
   
   I want to write data to S3 on my local IDE,and set AWS CREDENTIALS in my 
codes (i found the configurations in source code).The configurations seems like 
worke well,because it did not throw exception any more. But my code is pending 
after log info "s3a-file-system metrics system started" 
   
   ```
   val source =
         env.generateSequence(1,10)
         .map(d=>{
           i += 1
           val rowData = new GenericRowData(4)
           rowData.setField(0,StringData.fromString(i+""))
           rowData.setField(1,StringData.fromString(d+""))
           rowData.setField(2,StringData.fromString("p1"))
           rowData.setField(3,TimestampData.fromEpochMillis(1676427048588L))
           println("data:"+rowData.toString)
           rowData.asInstanceOf[RowData]
         }).javaStream
   
         val path = "s3a://test/hudi"
         val table = "huditable"
         val options = new util.HashMap[String,String]()
         options.put(FlinkOptions.PATH.key, path)
         options.put(FlinkOptions.TABLE_TYPE.key, 
HoodieTableType.MERGE_ON_READ.name)
         options.put("fs.defaultFS","s3a://test")
         options.put("hadoop.fs.s3a.access.key","mykey")
         options.put("hadoop.fs.s3a.secret.key","mykey")
         options.put("hadoop.fs.s3a.endpoint","myendpoint")
         options.put("hadoop.fs.s3a.region","myregion")
   
         val builder = HoodiePipeline.builder(table)
           .column("uuid VARCHAR(20)")
           .column("content VARCHAR(255)")
           .column("ps VARCHAR(20)")
           .column("ts TIMESTAMP(3)")
           .pk("uuid")
           .partition("ps")
           .options(options)
         builder.sink(source,false)
         env.execute()
   ```
   
   as you can see,I print recodes in map function,but the stoud is empty,so 
there was no datas processed.
   
   And then,the codes did not throw exceptions,But pending at here:
   ```
   [INFO ] 2023-02-15 10:22:01,098:Registering TaskManager with ResourceID 
e8ee6bfd-f75d-4486-9b7a-45fdf0befd68 (akka://flink/user/rpc/taskmanager_0) at 
ResourceManager
   [INFO ] 2023-02-15 10:22:01,100:Successful registration at resource manager 
akka://flink/user/rpc/resourcemanager_1 under registration id 
7f51c4797923db47714b2ca1ab8d84ab.
   [INFO ] 2023-02-15 10:22:01,100:Received JobGraph submission 'Flink 
Streaming Job' (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,101:Submitting job 'Flink Streaming Job' 
(f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,116:Proposing leadership to contender 
LeaderContender: JobMasterServiceLeadershipRunner
   [INFO ] 2023-02-15 10:22:01,128:Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_3 .
   [INFO ] 2023-02-15 10:22:01,134:Initializing job 'Flink Streaming Job' 
(f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,154:Using restart back off time strategy 
NoRestartBackoffTimeStrategy for Flink Streaming Job 
(f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,196:Running initialization on master for job 
Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,196:Successfully ran initialization on master in 
0 ms.
   [INFO ] 2023-02-15 10:22:01,292:Built 1 new pipelined regions in 1 ms, total 
1 pipelined regions currently.
   [INFO ] 2023-02-15 10:22:01,300:No state backend has been configured, using 
default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@6f4f4b24
   [INFO ] 2023-02-15 10:22:01,300:State backend loader loads the state backend 
as HashMapStateBackend
   [INFO ] 2023-02-15 10:22:01,303:Checkpoint storage is set to 'jobmanager'
   [INFO ] 2023-02-15 10:22:01,339:No checkpoint found during restore.
   [INFO ] 2023-02-15 10:22:01,346:Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@309d741
 for Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,354:Received confirmation of leadership for 
leader akka://flink/user/rpc/jobmanager_3 , 
session=9934221e-e0be-4027-a8ce-975fedab52c5
   [INFO ] 2023-02-15 10:22:01,357:Starting execution of job 'Flink Streaming 
Job' (f9929ff372b45550c0560a998e1a7041) under job master id 
a8ce975fedab52c59934221ee0be4027.
   [WARN ] 2023-02-15 10:22:13,961:Cannot locate configuration: tried 
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
   [INFO ] 2023-02-15 10:22:13,972:Scheduled Metric snapshot period at 10 
second(s).
   [INFO ] 2023-02-15 10:22:13,972:s3a-file-system metrics system started
   ```
   
   I have tried to use Spark to write data like this,it worked well.
   
   So what is problem with my Flink codes.
   
   I didn't found more examples on Hudi doc using Flink datastream api.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.generate the records
   
   2.set congigurations
   
   3.sink to s3
   
   
   **Environment Description**
   
   * Hudi version : 0.12.2
   
   * Flink version :1.15.2
   
   * Hadoop version :3.3.4
   
   * Scala version : 2.12
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   
   **Stacktrace**
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to