This is an automated email from the ASF dual-hosted git repository.
samaitra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 173f116 Added acknowledgement for pub/sub Streamer - Fixes #8.
173f116 is described below
commit 173f1166404eb017162a124d26df6bcb5fd99ba5
Author: gkatzioura <[email protected]>
AuthorDate: Sat Mar 14 15:00:34 2020 -0500
Added acknowledgement for pub/sub Streamer - Fixes #8.
Signed-off-by: samaitra <[email protected]>
---
.../ignite/stream/pubsub/PubSubStreamer.java | 10 ++++++++
.../ignite/stream/pubsub/MockPubSubServer.java | 28 ++++++++++++++++++----
2 files changed, 34 insertions(+), 4 deletions(-)
diff --git
a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
index 13384d2..3a4b689 100644
---
a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
+++
b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.stream.StreamAdapter;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
@@ -224,9 +225,18 @@ public class PubSubStreamer<K,V> extends
StreamAdapter<PubsubMessage, K, V> {
PullResponse pullResponse =
subscriberStub.pullCallable().call(pullRequest);
+ List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message :
pullResponse.getReceivedMessagesList()) {
addMessage(message.getMessage());
+ ackIds.add(message.getAckId());
}
+
+ AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
+
.setSubscription(subscriptionName)
+
.addAllAckIds(ackIds)
+
.build();
+
+
subscriberStub.acknowledgeCallable().call(acknowledgeRequest);
}
} finally {
subscriberStub.close();
diff --git
a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 20fe767..714961c 100644
---
a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++
b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -24,15 +24,15 @@ import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
@@ -68,7 +68,6 @@ class MockPubSubServer {
public static final int MESSAGES_PER_REQUEST = 10;
private final Map<String, Publisher> publishers = new HashMap<>();
- private final List<PubsubMessage> topicMessages = new ArrayList<>();
private final Queue<PubsubMessage> blockingQueue = new
LinkedBlockingDeque<>();
public SubscriberStubSettings createSubscriberStub() throws IOException {
@@ -87,10 +86,31 @@ class MockPubSubServer {
@NotNull
private ManagedChannel managedChannel() {
ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);
-
when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la)
-> clientCall());
+
when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la)
-> {
+ MethodDescriptor methodDescriptor = (MethodDescriptor)
la.getArguments()[0];
+
if(methodDescriptor.getFullMethodName().equals("google.pubsub.v1.Subscriber/Acknowledge"))
{
+ return acknowledgeCall();
+ }
+
+ return clientCall();
+ });
return managedChannel;
}
+ private ClientCall<AcknowledgeRequest, Empty> acknowledgeCall() {
+ ClientCall<AcknowledgeRequest, Empty> clientCall =
Mockito.mock(ClientCall.class);
+ doAnswer(iom -> {
+ Object[] arguments = iom.getArguments();
+ ClientCall.Listener<Empty> listener =
(ClientCall.Listener<Empty>) arguments[0];
+ listener.onMessage(Empty.getDefaultInstance());
+ Metadata metadata = (Metadata) arguments[1];
+ listener.onClose(Status.OK, metadata);
+ return null;
+ }
+
).when(clientCall).start(any(ClientCall.Listener.class),any(Metadata.class));
+ return clientCall;
+ }
+
private ClientCall<PullRequest, PullResponse> clientCall() {
ClientCall<PullRequest, PullResponse> clientCall =
Mockito.mock(ClientCall.class);