zhangshenghang commented on PR #7361:
URL: https://github.com/apache/seatunnel/pull/7361#issuecomment-2278947992
@Hisoka-X @hailin0
By configuring the protobuf data structure, we can successfully receive and
send related messages.
This is an example used, and I will continue to improve the relevant
documentation and unit testing.
Help me see where we need to optimize , thk.
```
# Defining the runtime environment
env {
parallelism = 1
job.mode = "STREAMING"
flink.execution.checkpointing.interval=5000
flink.execution.restart.strategy = failure-rate
flink.execution.restart.failureInterval = 60000
flink.execution.restart.failureRate = 100
flink.execution.restart.delayInterval = 10000
# execution.restart.strategy = fixed-delay
# execution.restart.attempts = 11
# execution.restart.delayBetweenAttempts = 10000
}
source {
Kafka {
schema = {
fields {
name = string
id = int
email = string
Address {
city = string
state = string
street = string
}
attributes = "map<string,float>"
}
}
topic = "test"
format = protobuf
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";
package wiki.hadoop.protobuf;
option java_outer_classname = "Test2";
message Person {
string name = 1;
int32 id = 2;
string email = 3;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 4;
map<string, float> attributes = 5;
repeated string phone_numbers = 6;
}
"""
consumer.group = "test-202301011"
bootstrap.servers = "172.16.24.194:9092"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "latest"
enable.auto.commit = "false"
}
result_table_name = "kafka_table"
}
}
transform {
Sql {
source_table_name = "kafka_table"
result_table_name = "kafka_table2"
query = "select name, Address.state ,Address.city ,attributes.test1 from
kafka_table"
}
}
sink {
Console{
source_table_name = "kafka_table2"
}
}
```
```
# Defining the runtime environment
env {
parallelism = 1
job.mode = "STREAMING"
flink.execution.checkpointing.interval=5000
flink.execution.restart.strategy = failure-rate
flink.execution.restart.failureInterval = 60000
flink.execution.restart.failureRate = 100
flink.execution.restart.delayInterval = 10000
}
source {
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = string
id = int
email = string
Address {
city = string
state = string
street = string
}
attributes = "map<string,float>"
phone_numbers = "array<string>"
}
}
}
}
sink {
Console{}
kafka {
topic = "test_topic"
bootstrap.servers = "172.16.24.194:9092"
format = protobuf
kafka.request.timeout.ms = 60000
# semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";
package wiki.hadoop.protobuf;
option java_outer_classname = "Test2";
message Person {
string name = 1;
int32 id = 2;
string email = 3;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 4;
map<string, float> attributes = 5;
repeated string phone_numbers = 6;
}
"""
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]