Thanks Igal,
I dont have control over the data source inside kafka ( current kafka topic 
contains either json or avro formats only, i am trying to reproduce this 
scenario using my test data generator ). 

is it possible to convert the json to proto at the receiving end of statefun 
applicaiton?

On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote: 
> Hi,
> 
> The values must be valid encoded Protobuf messages [1], while in your
> attached code snippet you are sending utf-8 encoded JSON strings.
> You can take a look at this example with a generator that produces Protobuf
> messages [2][3]
> 
> [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> [2]
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> [3]
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> 
> On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <sunilsattir...@gmail.com>
> wrote:
> 
> > Hi, Based on the example from
> > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > I am trying to ingest json data in kafka, but unable to achieve based on
> > the examples.
> >
> > event-generator.py
> >
> > def produce():
> >     request = {}
> >     request['id'] = "abc-123"
> >     request['field1'] = "field1-1"
> >     request['field2'] = "field2-2"
> >     request['field3'] = "field3-3"
> >     if len(sys.argv) == 2:
> >         delay_seconds = int(sys.argv[1])
> >     else:
> >         delay_seconds = 1
> >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> >     for request in random_requests_dict():
> >         producer.send(topic='test-topic',
> >                       value=json.dumps(request).encode('utf-8'))
> >         producer.flush()
> >         time.sleep(delay_seconds)
> >
> > Below is the proto definition of the json data ( i dont always know all
> > the fields, but i know id fields definitely exists)
> > message.proto
> >
> > message MyRow {
> >     string id = 1;
> >     google.protobuf.Struct message = 2;
> > }
> >
> > Below is greeter that received the data
> > tokenizer.py ( same like greeter.py saving state of id instead of counting
> > )
> >
> >
> > @app.route('/statefun', methods=['POST'])
> > def handle():
> >     my_row = MyRow()
> >     data = my_row.ParseFromString(request.data) // Is this the right way
> > to do it?
> >     response_data = handler(request.data)
> >     response = make_response(response_data)
> >     response.headers.set('Content-Type', 'application/octet-stream')
> >     return response
> >
> >
> > but, below is the error message. I am a newbie with proto and appreciate
> > any help
> >
> > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > Traceback (most recent call last):
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447,
> > in wsgi_app
> >     response = self.full_dispatch_request()
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952,
> > in full_dispatch_request
> >     rv = self.handle_user_exception(e)
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821,
> > in handle_user_exception
> >     reraise(exc_type, exc_value, tb)
> >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39,
> > in reraise
> >     raise value
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950,
> > in full_dispatch_request
> >     rv = self.dispatch_request()
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936,
> > in dispatch_request
> >     return self.view_functions[rule.endpoint](**req.view_args)
> >   File "/app/tokenizer.py", line 101, in handle
> >     response_data = handler(data)
> >   File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > line 38, in __call__
> >     request.ParseFromString(request_bytes)
> >   File
> > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line
> > 199, in ParseFromString
> >     return self.MergeFromString(serialized)
> >   File
> > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > line 1131, in MergeFromString
> >     serialized = memoryview(serialized)
> > TypeError: memoryview: a bytes-like object is required, not 'int'
> >
> >
> 

Reply via email to