This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 46b1f67  NIFI-8631: Ensure that GCP Pub/Sub messages are not 
acknowledged until session has been committed, in order ot ensure that we don't 
have data loss
46b1f67 is described below

commit 46b1f6755c5ca3cc4bdb55e48fd09e1216b66d71
Author: Mark Payne <[email protected]>
AuthorDate: Wed May 26 11:34:51 2021 -0400

    NIFI-8631: Ensure that GCP Pub/Sub messages are not acknowledged until 
session has been committed, in order ot ensure that we don't have data loss
    
    This closes #5102.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 5693721..70b9e26 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -130,11 +131,10 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
     }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         if (subscriber == null) {
-
             if (storedException.get() != null) {
-                getLogger().error("Failed to create Google Cloud PubSub 
subscriber due to {}", new Object[]{storedException.get()});
+                getLogger().error("Failed to create Google Cloud PubSub 
subscriber due to {}", storedException.get());
             } else {
                 getLogger().error("Google Cloud PubSub Subscriber was not 
properly created. Yielding the processor...");
             }
@@ -145,6 +145,7 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
 
         final PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
         final List<String> ackIds = new ArrayList<>();
+        final String subscriptionName = getSubscriptionName(context);
 
         for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) 
{
             if (message.hasMessage()) {
@@ -164,20 +165,26 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
                 flowFile = session.write(flowFile, out -> 
out.write(message.getMessage().getData().toByteArray()));
 
                 session.transfer(flowFile, REL_SUCCESS);
-                session.getProvenanceReporter().receive(flowFile, 
getSubscriptionName(context));
+                session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
             }
         }
 
-        if (!ackIds.isEmpty()) {
-            AcknowledgeRequest acknowledgeRequest = 
AcknowledgeRequest.newBuilder()
-                    .addAllAckIds(ackIds)
-                    .setSubscription(getSubscriptionName(context))
-                    .build();
-            subscriber.acknowledgeCallable().call(acknowledgeRequest);
+        session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+    }
+
+    private void acknowledgeAcks(final Collection<String> ackIds, final String 
subscriptionName) {
+        if (ackIds == null || ackIds.isEmpty()) {
+            return;
         }
+
+        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+            .addAllAckIds(ackIds)
+            .setSubscription(subscriptionName)
+            .build();
+        subscriber.acknowledgeCallable().call(acknowledgeRequest);
     }
 
-    private String getSubscriptionName(ProcessContext context) {
+    private String getSubscriptionName(final ProcessContext context) {
         final String subscriptionName = 
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
         final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
 
@@ -189,8 +196,7 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
 
     }
 
-    private SubscriberStub getSubscriber(ProcessContext context) throws 
IOException {
-
+    private SubscriberStub getSubscriber(final ProcessContext context) throws 
IOException {
         final SubscriberStubSettings subscriberStubSettings = 
SubscriberStubSettings.newBuilder()
                 
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
                 .build();

Reply via email to