Hello again,
Thanks to
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/estimate_pi.py,
I saw that my first step must be a beam.Create() with an iterable. Doing
so solved my problem.
Sorry for my mistake.
On 2018/10/02 08:40:46, Jonathan Perron <[email protected]> wrote:
> Hello,>
>
> I am looking for some way to access data stored in PostgreSQL and
don't >
> know if I should go for a Sink or ParDo operations. It is stated that >
> ParDo could be used but I'm not sure this is what will solve my
problem, >
> so here I am !I managed to write in the database with only ParDo >
> operations, so I guess it is also possible here.>
>
> Some details about my use case:>
>
> * The Python SDK is used;>
>
> * Reading in the database is the first operation of the pipeline
before >
> making some calculation;>
>
> * It is performed with SQLAlchemy, but could also be done with
psycopg2;>
>
> * I don't think parallelizing this operation is necessary as the query >
> are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
>
> Here are my DoFn classes:>
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):>
> """>
> Extract PCollection from PostgreSQL>
> """>
>
> def start_bundle(self):>
> self._session = Session()>
>
> def process(self, element):>
> raise NotImplementedError>
>
> def finish_bundle(self):>
> self._session.close()>
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
> def start_bundle(self, arg1, arg2):>
> super(ReadFromPostgresqlFn, self).start_bundle()>
> self._arg1 = arg1>
> self._arg2 = arg2>
>
> def process(self, element):>
> entities = (>
> self._session.query(Entity)>
> .filter(Entity.arg1 == self._arg1)>
> .filter(Entity.arg2 == self._arg2)>
> .all()>
> )>
> yield (self._arg1, arg2)/>
>
> As I said, I used it just after the initialization of the pipeline:>
>
> /p = beam.Pipeline(options=pipeline_options)>
> psql_entities = p | "Extract Entities from PSQL backup" >> >
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
>
> Unfortunately, I end up with an /AttributeError: 'PBegin' object has
no >
> attribute 'windowing'/ error.>
>
> Where did I make a mistake ? I take every input you could provide me
on >
> this topic.>
>
> Thanks for your time,>
>
> Jonathan>
>
>
>