Hi Jonathan,
I had a similar requirement as yours, but for mongoDB and I tentatively
wrote an IO Connector for it that you can find here:
https://github.com/PEAT-AI/beam-extended
It is working with the DirectRunner in Read mode (I need to do some test
then on DataFlow).
But I faced some issue with the connector serializability for Write mode.
Long story short, pymongo is not serializable, and this is something you
have to validate for sqlalchemy.
Generally speaking, take a look at this part of the doc if you want to know
more about IO connector development:
https://beam.apache.org/documentation/sdks/python-custom-io/
I am not an expert but I'll be glad to help you since I would also need to
support PostgreSQL in the near future!
Cheers,
Pascal


[image: --]

Pascal Gula
[image: https://]about.me/metanov
<https://about.me/metanov?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=chrome_ext>


On Tue, 2 Oct 2018 at 10:41, Jonathan Perron <[email protected]>
wrote:

> Hello,
>
> I am looking for some way to access data stored in PostgreSQL and don't
> know if I should go for a Sink or ParDo operations. It is stated that ParDo
> could be used but I'm not sure this is what will solve my problem, so here
> I am !I managed to write in the database with only ParDo operations, so I
> guess it is also possible here.
>
> Some details about my use case:
>
> * The Python SDK is used;
>
> * Reading in the database is the first operation of the pipeline before
> making some calculation;
>
> * It is performed with SQLAlchemy, but could also be done with psycopg2;
>
> * I don't think parallelizing this operation is necessary as the query are
> and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).
>
> Here are my DoFn classes:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class ExtractFromPostgreSQLFn(beam.DoFn):     """     Extract PCollection
> from PostgreSQL     """     def start_bundle(self):         self._session =
> Session()     def process(self, element):         raise NotImplementedError
>     def finish_bundle(self):         self._session.close() class
> ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):     def
> start_bundle(self, arg1, arg2):         super(ReadFromPostgresqlFn,
> self).start_bundle()         self._arg1 = arg1         self._arg2 = arg2
>     def process(self, element):         entities = (
> self._session.query(Entity)             .filter(Entity.arg1 == self._arg1)
>             .filter(Entity.arg2 == self._arg2)             .all()         )
>         yield (self._arg1, arg2)*
>
> As I said, I used it just after the initialization of the pipeline:
>
>
> *p = beam.Pipeline(options=pipeline_options) psql_entities = p | "Extract
> Entities from PSQL backup" >>
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())*
>
> Unfortunately, I end up with an *AttributeError: 'PBegin' object has no
> attribute 'windowing'* error.
>
> Where did I make a mistake ? I take every input you could provide me on
> this topic.
>
> Thanks for your time,
>
> Jonathan
>
>
>

-- 

Pascal Gula
Senior Data Engineer / Scientist
+49 (0)176 34232684www.plantix.net <http://plantix.net/>
 PEAT GmbH
Kastanienallee 4
10435 Berlin // Germany
 <https://play.google.com/store/apps/details?id=com.peat.GartenBank>Download
the App! <https://play.google.com/store/apps/details?id=com.peat.GartenBank>

Reply via email to