bithw1 opened a new issue, #11932:
URL: https://github.com/apache/hudi/issues/11932

   
   Hi,
   I have following code snippet, that I want to 
   1) read the kafka source
   2) write the data into Hudi table
   3) query the hudi table and print on the console
   I am sure that the kafka source can read data from kakfa(I have tested this 
in the code), But, no data is written into the hudi table directory/file, and 
also there is only an `Empty set` is print on the console.
   
   Could someone help me out?Thanks very much
   
   ```
   package org.example.streaming
   
   import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
   import org.apache.flink.table.api.EnvironmentSettings
   import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, _}
   import org.apache.flink.types.Row
   
   object Test002_ReadKafkaAndWriteHudi {
     val hudi_table_name = "Test002_ReadKafkaAndWriteHudi"
   
     def main(args: Array[String]): Unit = {
       val settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
       val env = StreamExecutionEnvironment.getExecutionEnvironment
       val tenv = StreamTableEnvironment.create(env, settings)
       val topic = "topic1"
       val groupId = "Test001_ReadKafka"
       val startupMode = "latest-offset"
       val source_ddl =
         s"""
                 CREATE TABLE kafka_table (
                   a STRING,
                   b  STRING              
                 ) WITH (
                   'connector' = 'kafka',
                   'topic' = '$topic',
                   'properties.bootstrap.servers' = 'localhost:9092',
                   'properties.group.id' = '$groupId',
                   'scan.startup.mode' = '$startupMode',
                   'value.format' = 'json',
                   'value.json.fail-on-missing-field' = 'false',
                   'value.fields-include' = 'EXCEPT_KEY'
                 )
         """.stripMargin(' ')
       tenv.executeSql(source_ddl)
   
       val target_dll =
         s"""
            create table ${hudi_table_name} (
               a varchar(20),
               b varchar(20),
               primary key(a) not enforced
             )
             with (
             'connector' = 'hudi'
             ,'path' = 'file:///D:/data/hudi_demo/${hudi_table_name}'
             ,'table.type' = 'COPY_ON_WRITE'
             ,'table.name' = 'test_table'
             ,'write.insert.drop.duplicates' = 'true'
             ,'write.recordkey.field' = 'a'
             ,'write.precombine.field' = 'b'
             ,'write.shuffle.parallelism' = '1'
             ,'write.commit.max.retries' = '5'
             ,'write.cleaner.policy' = 'KEEP_LATEST_COMMITS'
             )
   
   
         """.stripMargin(' ')
   
   
       tenv.executeSql(target_dll)
   
       val sql =
         s"""
   
         insert into $hudi_table_name select a, b from kafka_table
   
         """.stripMargin(' ')
   
       tenv.executeSql(sql)
   
       val query = s"select * from $hudi_table_name"
       val result = tenv.sqlQuery(query)
       result.execute().print()
   
     }
   
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to