checking to see if this is possible currently.
Read json data from kafka topic => process using statefun => write out to kafka 
in json format.

I could have a separate process to read the source json data convert to 
protobuf into another kafka topic but it sounds in-efficient. 
e.g.
Read json data from kafka topic =>convert json to protobuf =>  process using 
statefun => write out to kafka in protobuf format.=> convert protobuf to json 
message

Appreciate any advice on how to process json messages using statefun , also if 
this is not possible in the current python sdk, can i do that using the 
java/scala sdk?

Thanks.

On 2020/06/15 15:34:39, Sunil Sattiraju <sunilsattir...@gmail.com> wrote: 
> Thanks Igal,
> I dont have control over the data source inside kafka ( current kafka topic 
> contains either json or avro formats only, i am trying to reproduce this 
> scenario using my test data generator ). 
> 
> is it possible to convert the json to proto at the receiving end of statefun 
> applicaiton?
> 
> On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote: 
> > Hi,
> > 
> > The values must be valid encoded Protobuf messages [1], while in your
> > attached code snippet you are sending utf-8 encoded JSON strings.
> > You can take a look at this example with a generator that produces Protobuf
> > messages [2][3]
> > 
> > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > [2]
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > [3]
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > 
> > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <sunilsattir...@gmail.com>
> > wrote:
> > 
> > > Hi, Based on the example from
> > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > I am trying to ingest json data in kafka, but unable to achieve based on
> > > the examples.
> > >
> > > event-generator.py
> > >
> > > def produce():
> > >     request = {}
> > >     request['id'] = "abc-123"
> > >     request['field1'] = "field1-1"
> > >     request['field2'] = "field2-2"
> > >     request['field3'] = "field3-3"
> > >     if len(sys.argv) == 2:
> > >         delay_seconds = int(sys.argv[1])
> > >     else:
> > >         delay_seconds = 1
> > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > >     for request in random_requests_dict():
> > >         producer.send(topic='test-topic',
> > >                       value=json.dumps(request).encode('utf-8'))
> > >         producer.flush()
> > >         time.sleep(delay_seconds)
> > >
> > > Below is the proto definition of the json data ( i dont always know all
> > > the fields, but i know id fields definitely exists)
> > > message.proto
> > >
> > > message MyRow {
> > >     string id = 1;
> > >     google.protobuf.Struct message = 2;
> > > }
> > >
> > > Below is greeter that received the data
> > > tokenizer.py ( same like greeter.py saving state of id instead of counting
> > > )
> > >
> > >
> > > @app.route('/statefun', methods=['POST'])
> > > def handle():
> > >     my_row = MyRow()
> > >     data = my_row.ParseFromString(request.data) // Is this the right way
> > > to do it?
> > >     response_data = handler(request.data)
> > >     response = make_response(response_data)
> > >     response.headers.set('Content-Type', 'application/octet-stream')
> > >     return response
> > >
> > >
> > > but, below is the error message. I am a newbie with proto and appreciate
> > > any help
> > >
> > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > Traceback (most recent call last):
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447,
> > > in wsgi_app
> > >     response = self.full_dispatch_request()
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952,
> > > in full_dispatch_request
> > >     rv = self.handle_user_exception(e)
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821,
> > > in handle_user_exception
> > >     reraise(exc_type, exc_value, tb)
> > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39,
> > > in reraise
> > >     raise value
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950,
> > > in full_dispatch_request
> > >     rv = self.dispatch_request()
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936,
> > > in dispatch_request
> > >     return self.view_functions[rule.endpoint](**req.view_args)
> > >   File "/app/tokenizer.py", line 101, in handle
> > >     response_data = handler(data)
> > >   File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > line 38, in __call__
> > >     request.ParseFromString(request_bytes)
> > >   File
> > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line
> > > 199, in ParseFromString
> > >     return self.MergeFromString(serialized)
> > >   File
> > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > line 1131, in MergeFromString
> > >     serialized = memoryview(serialized)
> > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > >
> > >
> > 
> 

Reply via email to