Hi Jonathan, I had a similar requirement as yours, but for mongoDB and I tentatively wrote an IO Connector for it that you can find here: https://github.com/PEAT-AI/beam-extended It is working with the DirectRunner in Read mode (I need to do some test then on DataFlow). But I faced some issue with the connector serializability for Write mode. Long story short, pymongo is not serializable, and this is something you have to validate for sqlalchemy. Generally speaking, take a look at this part of the doc if you want to know more about IO connector development: https://beam.apache.org/documentation/sdks/python-custom-io/ I am not an expert but I'll be glad to help you since I would also need to support PostgreSQL in the near future! Cheers, Pascal
[image: --] Pascal Gula [image: https://]about.me/metanov <https://about.me/metanov?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=chrome_ext> On Tue, 2 Oct 2018 at 10:41, Jonathan Perron <[email protected]> wrote: > Hello, > > I am looking for some way to access data stored in PostgreSQL and don't > know if I should go for a Sink or ParDo operations. It is stated that ParDo > could be used but I'm not sure this is what will solve my problem, so here > I am !I managed to write in the database with only ParDo operations, so I > guess it is also possible here. > > Some details about my use case: > > * The Python SDK is used; > > * Reading in the database is the first operation of the pipeline before > making some calculation; > > * It is performed with SQLAlchemy, but could also be done with psycopg2; > > * I don't think parallelizing this operation is necessary as the query are > and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz). > > Here are my DoFn classes: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *class ExtractFromPostgreSQLFn(beam.DoFn): """ Extract PCollection > from PostgreSQL """ def start_bundle(self): self._session = > Session() def process(self, element): raise NotImplementedError > def finish_bundle(self): self._session.close() class > ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn): def > start_bundle(self, arg1, arg2): super(ReadFromPostgresqlFn, > self).start_bundle() self._arg1 = arg1 self._arg2 = arg2 > def process(self, element): entities = ( > self._session.query(Entity) .filter(Entity.arg1 == self._arg1) > .filter(Entity.arg2 == self._arg2) .all() ) > yield (self._arg1, arg2)* > > As I said, I used it just after the initialization of the pipeline: > > > *p = beam.Pipeline(options=pipeline_options) psql_entities = p | "Extract > Entities from PSQL backup" >> > beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())* > > Unfortunately, I end up with an *AttributeError: 'PBegin' object has no > attribute 'windowing'* error. > > Where did I make a mistake ? I take every input you could provide me on > this topic. > > Thanks for your time, > > Jonathan > > > -- Pascal Gula Senior Data Engineer / Scientist +49 (0)176 34232684www.plantix.net <http://plantix.net/> PEAT GmbH Kastanienallee 4 10435 Berlin // Germany <https://play.google.com/store/apps/details?id=com.peat.GartenBank>Download the App! <https://play.google.com/store/apps/details?id=com.peat.GartenBank>
