checking to see if this is possible currently. Read json data from kafka topic => process using statefun => write out to kafka in json format.
I could have a separate process to read the source json data convert to protobuf into another kafka topic but it sounds in-efficient. e.g. Read json data from kafka topic =>convert json to protobuf => process using statefun => write out to kafka in protobuf format.=> convert protobuf to json message Appreciate any advice on how to process json messages using statefun , also if this is not possible in the current python sdk, can i do that using the java/scala sdk? Thanks. On 2020/06/15 15:34:39, Sunil Sattiraju <sunilsattir...@gmail.com> wrote: > Thanks Igal, > I dont have control over the data source inside kafka ( current kafka topic > contains either json or avro formats only, i am trying to reproduce this > scenario using my test data generator ). > > is it possible to convert the json to proto at the receiving end of statefun > applicaiton? > > On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote: > > Hi, > > > > The values must be valid encoded Protobuf messages [1], while in your > > attached code snippet you are sending utf-8 encoded JSON strings. > > You can take a look at this example with a generator that produces Protobuf > > messages [2][3] > > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial > > [2] > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 > > [3] > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 > > > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <sunilsattir...@gmail.com> > > wrote: > > > > > Hi, Based on the example from > > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > > > I am trying to ingest json data in kafka, but unable to achieve based on > > > the examples. > > > > > > event-generator.py > > > > > > def produce(): > > > request = {} > > > request['id'] = "abc-123" > > > request['field1'] = "field1-1" > > > request['field2'] = "field2-2" > > > request['field3'] = "field3-3" > > > if len(sys.argv) == 2: > > > delay_seconds = int(sys.argv[1]) > > > else: > > > delay_seconds = 1 > > > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > > > for request in random_requests_dict(): > > > producer.send(topic='test-topic', > > > value=json.dumps(request).encode('utf-8')) > > > producer.flush() > > > time.sleep(delay_seconds) > > > > > > Below is the proto definition of the json data ( i dont always know all > > > the fields, but i know id fields definitely exists) > > > message.proto > > > > > > message MyRow { > > > string id = 1; > > > google.protobuf.Struct message = 2; > > > } > > > > > > Below is greeter that received the data > > > tokenizer.py ( same like greeter.py saving state of id instead of counting > > > ) > > > > > > > > > @app.route('/statefun', methods=['POST']) > > > def handle(): > > > my_row = MyRow() > > > data = my_row.ParseFromString(request.data) // Is this the right way > > > to do it? > > > response_data = handler(request.data) > > > response = make_response(response_data) > > > response.headers.set('Content-Type', 'application/octet-stream') > > > return response > > > > > > > > > but, below is the error message. I am a newbie with proto and appreciate > > > any help > > > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > > > Traceback (most recent call last): > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, > > > in wsgi_app > > > response = self.full_dispatch_request() > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, > > > in full_dispatch_request > > > rv = self.handle_user_exception(e) > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, > > > in handle_user_exception > > > reraise(exc_type, exc_value, tb) > > > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, > > > in reraise > > > raise value > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, > > > in full_dispatch_request > > > rv = self.dispatch_request() > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, > > > in dispatch_request > > > return self.view_functions[rule.endpoint](**req.view_args) > > > File "/app/tokenizer.py", line 101, in handle > > > response_data = handler(data) > > > File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > > > line 38, in __call__ > > > request.ParseFromString(request_bytes) > > > File > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line > > > 199, in ParseFromString > > > return self.MergeFromString(serialized) > > > File > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > > > line 1131, in MergeFromString > > > serialized = memoryview(serialized) > > > TypeError: memoryview: a bytes-like object is required, not 'int' > > > > > > > > >