This is an automated email from the ASF dual-hosted git repository.

samaitra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 173f116  Added acknowledgement for pub/sub Streamer - Fixes #8.
173f116 is described below

commit 173f1166404eb017162a124d26df6bcb5fd99ba5
Author: gkatzioura <[email protected]>
AuthorDate: Sat Mar 14 15:00:34 2020 -0500

    Added acknowledgement for pub/sub Streamer - Fixes #8.
    
    Signed-off-by: samaitra <[email protected]>
---
 .../ignite/stream/pubsub/PubSubStreamer.java       | 10 ++++++++
 .../ignite/stream/pubsub/MockPubSubServer.java     | 28 ++++++++++++++++++----
 2 files changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
 
b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
index 13384d2..3a4b689 100644
--- 
a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
+++ 
b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.stream.StreamAdapter;
 import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
 import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.ProjectTopicName;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.PullRequest;
@@ -224,9 +225,18 @@ public class PubSubStreamer<K,V> extends 
StreamAdapter<PubsubMessage, K, V> {
 
                     PullResponse pullResponse = 
subscriberStub.pullCallable().call(pullRequest);
 
+                    List<String> ackIds = new ArrayList<>();
                     for (ReceivedMessage message : 
pullResponse.getReceivedMessagesList()) {
                         addMessage(message.getMessage());
+                        ackIds.add(message.getAckId());
                     }
+
+                    AcknowledgeRequest acknowledgeRequest = 
AcknowledgeRequest.newBuilder()
+                                                                              
.setSubscription(subscriptionName)
+                                                                              
.addAllAckIds(ackIds)
+                                                                              
.build();
+
+                    
subscriberStub.acknowledgeCallable().call(acknowledgeRequest);
                 }
             } finally {
                 subscriberStub.close();
diff --git 
a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
 
b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 20fe767..714961c 100644
--- 
a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ 
b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -24,15 +24,15 @@ import com.google.api.gax.grpc.GrpcTransportChannel;
 import com.google.api.gax.rpc.FixedTransportChannelProvider;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
 import com.google.pubsub.v1.ReceivedMessage;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
@@ -68,7 +68,6 @@ class MockPubSubServer {
     public static final int MESSAGES_PER_REQUEST = 10;
 
     private final Map<String, Publisher> publishers = new HashMap<>();
-    private final List<PubsubMessage> topicMessages = new ArrayList<>();
     private final Queue<PubsubMessage> blockingQueue = new 
LinkedBlockingDeque<>();
 
     public SubscriberStubSettings createSubscriberStub() throws IOException {
@@ -87,10 +86,31 @@ class MockPubSubServer {
     @NotNull
     private ManagedChannel managedChannel() {
         ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);
-        
when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la)
 -> clientCall());
+        
when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la)
 -> {
+            MethodDescriptor methodDescriptor = (MethodDescriptor) 
la.getArguments()[0];
+            
if(methodDescriptor.getFullMethodName().equals("google.pubsub.v1.Subscriber/Acknowledge"))
 {
+                return acknowledgeCall();
+            }
+
+            return clientCall();
+        });
         return managedChannel;
     }
 
+    private ClientCall<AcknowledgeRequest, Empty> acknowledgeCall() {
+        ClientCall<AcknowledgeRequest, Empty> clientCall = 
Mockito.mock(ClientCall.class);
+        doAnswer(iom -> {
+                     Object[] arguments = iom.getArguments();
+                     ClientCall.Listener<Empty> listener = 
(ClientCall.Listener<Empty>) arguments[0];
+                     listener.onMessage(Empty.getDefaultInstance());
+                     Metadata metadata = (Metadata) arguments[1];
+                     listener.onClose(Status.OK, metadata);
+                     return null;
+                 }
+                
).when(clientCall).start(any(ClientCall.Listener.class),any(Metadata.class));
+        return clientCall;
+    }
+
     private ClientCall<PullRequest, PullResponse> clientCall() {
         ClientCall<PullRequest, PullResponse> clientCall = 
Mockito.mock(ClientCall.class);
 

Reply via email to