zhangshenghang commented on PR #7361:
URL: https://github.com/apache/seatunnel/pull/7361#issuecomment-2278947992

   @Hisoka-X @hailin0 
   By configuring the protobuf data structure, we can successfully receive and 
send related messages.
   
   This is an example used, and I will continue to improve the relevant 
documentation and unit testing.
   
   Help me see where we need to optimize ,  thk.
   
   ```
   # Defining the runtime environment
   env {
       parallelism = 1
       job.mode = "STREAMING"
       flink.execution.checkpointing.interval=5000
        flink.execution.restart.strategy = failure-rate
        flink.execution.restart.failureInterval = 60000
        flink.execution.restart.failureRate = 100
        flink.execution.restart.delayInterval = 10000
      # execution.restart.strategy = fixed-delay
      # execution.restart.attempts = 11
      # execution.restart.delayBetweenAttempts = 10000
   }
   source {
     Kafka {
       schema = {
         fields {
           name = string
           id = int
           email = string
           Address {
               city = string
               state = string
               street = string
               }
           attributes = "map<string,float>"
           }
       }
       topic = "test"
       format = protobuf
       protobuf_message_name = Person
       protobuf_schema = """
                 syntax = "proto3";
   
                 package wiki.hadoop.protobuf;
   
                 option java_outer_classname = "Test2";
   
                 message Person {
                   string name = 1;
                   int32 id = 2;
                   string email = 3;
   
                   message Address {
                     string street = 1;
                     string city = 2;
                     string state = 3;
                     string zip = 4;
                   }
   
                   Address address = 4;
   
                   map<string, float> attributes = 5;
   
                   repeated string phone_numbers = 6;
                 }
       """
       consumer.group   = "test-202301011"
       bootstrap.servers = "172.16.24.194:9092"
       kafka.config = {
         client.id = client_1
         max.poll.records = 500
         auto.offset.reset = "latest"
         enable.auto.commit = "false"
       }
       result_table_name = "kafka_table"
     }
   }
   transform {
     Sql {
       source_table_name = "kafka_table"
       result_table_name = "kafka_table2"
       query = "select name, Address.state ,Address.city ,attributes.test1 from 
kafka_table"
     }
   }
   
   
   sink {
   
       Console{
       source_table_name = "kafka_table2"
       }
   
   }
   ```
   
   ```
   # Defining the runtime environment
   env {
       parallelism = 1
       job.mode = "STREAMING"
       flink.execution.checkpointing.interval=5000
        flink.execution.restart.strategy = failure-rate
        flink.execution.restart.failureInterval = 60000
        flink.execution.restart.failureRate = 100
        flink.execution.restart.delayInterval = 10000
   
   }
   source {
      FakeSource {
         parallelism = 1
         result_table_name = "fake"
         row.num = 16
         schema = {
           fields {
                     name = string
                     id = int
                     email = string
                     Address {
                         city = string
                         state = string
                         street = string
                         }
                     attributes = "map<string,float>"
                     phone_numbers = "array<string>"
           }
         }
       }
   }
   
   sink {
     Console{}
     kafka {
         topic = "test_topic"
         bootstrap.servers = "172.16.24.194:9092"
         format = protobuf
         kafka.request.timeout.ms = 60000
   #       semantics = EXACTLY_ONCE
         kafka.config = {
           acks = "all"
           request.timeout.ms = 60000
           buffer.memory = 33554432
         }
         protobuf_message_name = Person
         protobuf_schema = """
                 syntax = "proto3";
   
                 package wiki.hadoop.protobuf;
   
                 option java_outer_classname = "Test2";
   
                 message Person {
                   string name = 1;
                   int32 id = 2;
                   string email = 3;
   
                   message Address {
                     string street = 1;
                     string city = 2;
                     string state = 3;
                     string zip = 4;
                   }
   
                   Address address = 4;
   
                   map<string, float> attributes = 5;
   
                   repeated string phone_numbers = 6;
                 }
                 """
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to