I found that I forgot to perform a beam.Create() first... So no need to answer me, this solves my problem.

On 2018/10/02 08:40:46, Jonathan Perron <[email protected]> wrote:
> Hello,>
>
> I am looking for some way to access data stored in PostgreSQL and don't >
> know if I should go for a Sink or ParDo operations. It is stated that >
> ParDo could be used but I'm not sure this is what will solve my problem, >
> so here I am !I managed to write in the database with only ParDo >
> operations, so I guess it is also possible here.>
>
> Some details about my use case:>
>
> * The Python SDK is used;>
>
> * Reading in the database is the first operation of the pipeline before >
> making some calculation;>
>
> * It is performed with SQLAlchemy, but could also be done with psycopg2;>
>
> * I don't think parallelizing this operation is necessary as the query >
> are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
>
> Here are my DoFn classes:>
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):>
>     """>
>     Extract PCollection from PostgreSQL>
>     """>
>
>     def start_bundle(self):>
>         self._session = Session()>
>
>     def process(self, element):>
>         raise NotImplementedError>
>
>     def finish_bundle(self):>
>         self._session.close()>
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
>     def start_bundle(self, arg1, arg2):>
>         super(ReadFromPostgresqlFn, self).start_bundle()>
>         self._arg1 = arg1>
>         self._arg2 = arg2>
>
>     def process(self, element):>
>         entities = (>
>             self._session.query(Entity)>
>             .filter(Entity.arg1 == self._arg1)>
>             .filter(Entity.arg2 == self._arg2)>
>             .all()>
>         )>
>         yield (self._arg1, arg2)/>
>
> As I said, I used it just after the initialization of the pipeline:>
>
> /p = beam.Pipeline(options=pipeline_options)>
> psql_entities = p | "Extract Entities from PSQL backup" >> >
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
>
> Unfortunately, I end up with an /AttributeError: 'PBegin' object has no >
> attribute 'windowing'/ error.>
>
> Where did I make a mistake ? I take every input you could provide me on >
> this topic.>
>
> Thanks for your time,>
>
> Jonathan>
>
>
>

--
<http://inuse.eu>


OptimData
71 rue Desnouettes, 75015 Paris, France

http://inuse.eu <http://inuse.eu>




Reply via email to