Hi all,
I was just using the postgres sink [0] and got an error. I am using following docker container: docker run --name "streampipes_postgis" -e POSTGRES_USER=streampipes -e POSTGRES_PASS=streampipes -e POSTGRES_DBNAME=streampipes -p 65432:5432 -d -t kartoza/postgis The database is created and also the table. But saving the events in the DB is not working. 19:54:00.875 SP [Thread-2] WARN o.a.s.s.d.jvm.postgresql.PostgreSql - USERLOG - correspondingPipeline: 839d7efd-d561-4731-80f9-343610fcdc5d - peURI: http://172.17.0.1:8005/sec/org.apache.streampipes.sinks.databases.jvm.postgresql/839d7efd-d561-4731-80f9-343610fcdc5d-org.streampipes.connect.ebf6c159-7576-4f7d-8e43-a79d4b5f8080-postgresql-0 - Table 'testtable' was unexpectedly not found and gets recreated. 19:54:00.880 SP [Thread-2] ERROR o.a.s.s.d.jvm.postgresql.PostgreSql - USERLOG - correspondingPipeline: 839d7efd-d561-4731-80f9-343610fcdc5d - peURI: http://172.17.0.1:8005/sec/org.apache.streampipes.sinks.databases.jvm.postgresql/839d7efd-d561-4731-80f9-343610fcdc5d-org.streampipes.connect.ebf6c159-7576-4f7d-8e43-a79d4b5f8080-postgresql-0 - ERROR: relation "testtable" already exists The last message appears after every event is saved. I had a quick look in the code but not able to find the reason so far. The code changes a lot compared to the last time I looked at it. In the ensureDatabaseExists method in the jdbcClient I also saw a comment: // Checks whether the database already exists (using catalogs has not worked with postgres) If I use following query, I can check in postgres if a database, table or even schema already exists. Maybe this is helpful??? String checkTableName = "SELECT EXISTS (SELECT table_name FROM information_schema.tables WHERE table_schema = '"+ schemaName + "' AND table_name = '"+tableName+"') AS result;"; String checkDatabaseName = "SELECT EXISTS (SELECT 1 FROM pg_database WHERE datname = '"+ databaseName + "') AS result;"; String checkSchemaName = "SELECT EXISTS (SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = '" + schemaName +"') AS result;"; I used this method: private boolean checkExistInPG(Connection conn, String query) { boolean exists = false; try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(query)){ if(rs.next()) { exists = rs.getBoolean(1); } } catch (SQLException e) { throw new SpRuntimeException("Check if database, table or schema exists went wrong: " + e.getSQLState() +"\n" + e.getMessage()); //e.printStackTrace(); } finally { return exists; } } I would also like to start a discussion about extending the postgres sink. Would it be a good idea to support the user input "db schema" as well? At the moment the table is only written in the public schema. I saw that the jdbcClient is also used for the iotdb? Would this be compatible? Is this also a postgres db? I am asking because I am thinking to extend the Postgres with the PostGIS extension as well. Sorry this email is longer than expected :-D Kind regards Florian [0] https://github.com/apache/incubator-streampipes-extensions/tree/dev/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql Disy Informationssysteme GmbH Florian Micklich Lösungsentwickler +49 721 16006 477, [email protected] Firmensitz: Ludwig-Erhard-Allee 6, 76131 Karlsruhe Registergericht: Amtsgericht Mannheim, HRB 107964 Geschäftsführer: Claus Hofmann Bitte beachten Sie folgende Informationen für Kunden, Lieferanten und Bewerber - Datenschutz: www.disy.net/datenschutz - Informationspflichten: www.disy.net/informationspflichten
