bithw1 opened a new issue, #11932:
URL: https://github.com/apache/hudi/issues/11932
Hi,
I have following code snippet, that I want to
1) read the kafka source
2) write the data into Hudi table
3) query the hudi table and print on the console
I am sure that the kafka source can read data from kakfa(I have tested this
in the code), But, no data is written into the hudi table directory/file, and
also there is only an `Empty set` is print on the console.
Could someone help me out?Thanks very much
```
package org.example.streaming
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row
object Test002_ReadKafkaAndWriteHudi {
val hudi_table_name = "Test002_ReadKafkaAndWriteHudi"
def main(args: Array[String]): Unit = {
val settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, settings)
val topic = "topic1"
val groupId = "Test001_ReadKafka"
val startupMode = "latest-offset"
val source_ddl =
s"""
CREATE TABLE kafka_table (
a STRING,
b STRING
) WITH (
'connector' = 'kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = '$groupId',
'scan.startup.mode' = '$startupMode',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
)
""".stripMargin(' ')
tenv.executeSql(source_ddl)
val target_dll =
s"""
create table ${hudi_table_name} (
a varchar(20),
b varchar(20),
primary key(a) not enforced
)
with (
'connector' = 'hudi'
,'path' = 'file:///D:/data/hudi_demo/${hudi_table_name}'
,'table.type' = 'COPY_ON_WRITE'
,'table.name' = 'test_table'
,'write.insert.drop.duplicates' = 'true'
,'write.recordkey.field' = 'a'
,'write.precombine.field' = 'b'
,'write.shuffle.parallelism' = '1'
,'write.commit.max.retries' = '5'
,'write.cleaner.policy' = 'KEEP_LATEST_COMMITS'
)
""".stripMargin(' ')
tenv.executeSql(target_dll)
val sql =
s"""
insert into $hudi_table_name select a, b from kafka_table
""".stripMargin(' ')
tenv.executeSql(sql)
val query = s"select * from $hudi_table_name"
val result = tenv.sqlQuery(query)
result.execute().print()
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]