This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new d4869c8 pg-replication-slot fix: if PG connection got lost while
trying to send (#3389)
d4869c8 is described below
commit d4869c8ceadf6e7a79cae67a43bdd798c6b7b52d
Author: Bahaa Zaid <[email protected]>
AuthorDate: Sat Dec 7 10:03:17 2019 +0200
pg-replication-slot fix: if PG connection got lost while trying to send
(#3389)
the status to PG, the reconnect logic will never be called and the
exchange will be reprocessed forever.
Explanation:
After we receive the payload from PostgreSQL, we keep it in memory so we
can process it again and again in case the processing of the exchange
fails. After the successful completion of the exchange, we (1) send the
status to PostgreSQL, (2) reset the payload, and (3) we receive the next
one.
If for some reason we lose the connection while trying to do step 2, the
payload will never get reset, and it will be processed forever. We need
to reset the payload first so in case of failure in updating the status,
the next poll will reconnect and receive the next payload. The next
status update cover both payload as LSNs are sequential.
---
.../component/pg/replication/slot/PgReplicationSlotConsumer.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index 5073f97..4390ae4 100644
---
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -157,6 +157,10 @@ public class PgReplicationSlotConsumer extends
ScheduledPollConsumer {
private void processCommit(Exchange exchange) {
try {
+ // Reset the `payload` buffer first because it's already
processed, and in case of losing the connection
+ // while updating the status, the next poll will try to reconnect
again instead of processing the stale payload.
+ this.payload = null;
+
PGReplicationStream stream = getStream();
if (stream == null) {
@@ -166,8 +170,6 @@ public class PgReplicationSlotConsumer extends
ScheduledPollConsumer {
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
stream.forceUpdateStatus();
-
- this.payload = null;
} catch (SQLException e) {
getExceptionHandler().handleException("Exception while sending
feedback to PostgreSQL.", exchange, e);
}