Fixed adding timestamp and id attributes to pubsub messages

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f77a8580
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f77a8580
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f77a8580

Branch: refs/heads/tez-runner
Commit: f77a8580700b1cd79e0c824f171cccd8e2f332b3
Parents: 30886ac
Author: Nigel Kilmer <nkil...@google.com>
Authored: Mon Jul 17 18:09:57 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Nov 15 15:44:23 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     | 34 ++++++-------
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 53 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index b745422..ab08813 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -135,23 +135,7 @@ class PubsubJsonClient extends PubsubClient {
     List<PubsubMessage> pubsubMessages = new 
ArrayList<>(outgoingMessages.size());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
       PubsubMessage pubsubMessage = new 
PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
-      Map<String, String> attributes = outgoingMessage.attributes;
-      if ((timestampAttribute != null || idAttribute != null) && attributes == 
null) {
-        attributes = new TreeMap<>();
-      }
-      if (attributes != null) {
-        pubsubMessage.setAttributes(attributes);
-      }
-
-      if (timestampAttribute != null) {
-        attributes.put(timestampAttribute, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idAttribute != null && 
!Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        attributes.put(idAttribute, outgoingMessage.recordId);
-      }
-
+      pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
       pubsubMessages.add(pubsubMessage);
     }
     PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
@@ -162,6 +146,22 @@ class PubsubJsonClient extends PubsubClient {
     return response.getMessageIds().size();
   }
 
+  private Map<String, String> getMessageAttributes(OutgoingMessage 
outgoingMessage) {
+    Map<String, String> attributes = null;
+    if (outgoingMessage.attributes == null) {
+      attributes = new TreeMap<>();
+    } else {
+      attributes = new TreeMap<>(outgoingMessage.attributes);
+    }
+    if (timestampAttribute != null) {
+      attributes.put(timestampAttribute, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+    }
+    if (idAttribute != null && 
!Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+      attributes.put(idAttribute, outgoingMessage.recordId);
+    }
+    return attributes;
+  }
+
   @Override
   public List<IncomingMessage> pull(
       long requestTimeMsSinceEpoch,

http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index 578f814..cbb24f2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -136,4 +136,57 @@ public class PubsubJsonClientTest {
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
+
+  @Test
+  public void publishOneMessageWithOnlyTimestampAndIdAttributes() throws 
IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.<String, String> builder()
+                    .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+                    .put(ID_ATTRIBUTE, RECORD_ID).build());
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when((Object) (mockPubsub.projects()
+                                .topics()
+                                .publish(expectedTopic, expectedRequest)
+                                .execute()))
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(
+        DATA.getBytes(), ImmutableMap.<String, String>of(), MESSAGE_TIME, 
RECORD_ID);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+
+  @Test
+  public void publishOneMessageWithNoTimestampOrIdAttribute() throws 
IOException {
+    // For this test, create a new PubsubJsonClient without the timestamp 
attribute
+    // or id attribute set.
+    client = new PubsubJsonClient(null, null, mockPubsub);
+
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.<String, String> builder()
+                    .put("k", "v").build());
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when((Object) (mockPubsub.projects()
+                                .topics()
+                                .publish(expectedTopic, expectedRequest)
+                                .execute()))
+           .thenReturn(expectedResponse);
+    Map<String, String> attrs = new HashMap<>();
+    attrs.put("k", "v");
+    OutgoingMessage actualMessage = new OutgoingMessage(
+            DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
 }

Reply via email to