Hello,

I am looking for some way to access data stored in PostgreSQL and don't know if I should go for a Sink or ParDo operations. It is stated that ParDo could be used but I'm not sure this is what will solve my problem, so here I am !I managed to write in the database with only ParDo operations, so I guess it is also possible here.

Some details about my use case:

* The Python SDK is used;

* Reading in the database is the first operation of the pipeline before making some calculation;

* It is performed with SQLAlchemy, but could also be done with psycopg2;

* I don't think parallelizing this operation is necessary as the query are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).

Here are my DoFn classes:

/class ExtractFromPostgreSQLFn(beam.DoFn):
    """
    Extract PCollection from PostgreSQL
    """

    def start_bundle(self):
        self._session = Session()

    def process(self, element):
        raise NotImplementedError

    def finish_bundle(self):
        self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
    def start_bundle(self, arg1, arg2):
        super(ReadFromPostgresqlFn, self).start_bundle()
        self._arg1 = arg1
        self._arg2 = arg2

    def process(self, element):
        entities = (
            self._session.query(Entity)
            .filter(Entity.arg1 == self._arg1)
            .filter(Entity.arg2 == self._arg2)
            .all()
        )
        yield (self._arg1, arg2)/

As I said, I used it just after the initialization of the pipeline:

/p = beam.Pipeline(options=pipeline_options)
psql_entities = p | "Extract Entities from PSQL backup" >> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/

Unfortunately, I end up with an /AttributeError: 'PBegin' object has no attribute 'windowing'/ error.

Where did I make a mistake ? I take every input you could provide me on this topic.

Thanks for your time,

Jonathan


Reply via email to