Paninka opened a new issue #13942:
URL: https://github.com/apache/pulsar/issues/13942


   **Describe the bug**
   I'm using the JDBC driver to sink Postgresql, and when it gets an error from 
postgres, i'm not able to ingest more data because I get this Error: `current 
transaction is aborted, commands ignored until end of transaction block`
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Set JDBC Postgresql sink. I did this quickstart 
   https://pulsar.apache.org/docs/en/io-quickstart/#connect-pulsar-to-postgresql
   
   2. I will send two messages to Pulsar topic from a Java Client and It has 
setted a schema definition
   ```
   getProducer(eventTopic).newMessage(Schema.AVRO(Esquema.class))
            .key("test")
            .value(m)
            .send();
   ```
   
   3. The table of Postgres has a primary key. So It will get an error when It 
gets a duplicate key
   
   ```
   CREATE TABLE test (
       id integer PRIMARY KEY,
       name text null
   );
   ```
   
   4. I sent two messages with the same primary key and i got this error, but 
It keeps on "Trasaction aborted" and i can't send new messages until I delete 
the sink
   
   ```
   2022-01-25T13:22:49,236+0000 [pool-6-thread-1] ERROR 
org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception 
   org.postgresql.util.PSQLException: ERROR: current transaction is aborted, 
commands ignored until end of transaction block
           at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2533)
 ~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2268)
 ~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) 
~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) 
~[postgresql-42.2.12.jar:42.2.12]
           at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) 
~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159)
 ~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) 
~[postgresql-42.2.12.jar:42.2.12]
           at 
org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) 
~[pulsar-io-jdbc-core-2.9.1.jar:?]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
[?:?]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 [?:?]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
           at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value 
violates unique constraint "id_pkey"
     Detail: Key (id)=(16377) already exists.
           at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2533)
 ~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2268)
 ~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) 
~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) 
~[postgresql-42.2.12.jar:42.2.12]
           at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) 
~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159)
 ~[postgresql-42.2.12.jar:42.2.12]
           at 
org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) 
~[postgresql-42.2.12.jar:42.2.12]
           at 
org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) 
~[pulsar-io-jdbc-core-2.9.1.jar:?]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
           at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 ~[?:?]
           ... 3 more
   
   ```
   All new messages that i send to topic, it will be ignored by this 
transaction block.
   
   **Desktop (please complete the following information):**
    - OS: Ubuntu 20.04
    - Pulsar: 2.9.1
    - Postgres: 12
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to