Repository: beam Updated Branches: refs/heads/master b1c287bd5 -> 1e2ad65f8
Fix PubSubIO write attribute issue Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad9df5b5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad9df5b5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad9df5b5 Branch: refs/heads/master Commit: ad9df5b5591ce9d153039ac91e8862af6ea42b45 Parents: b1c287b Author: Chen Bin <[email protected]> Authored: Thu Mar 9 11:09:04 2017 +0800 Committer: Dan Halperin <[email protected]> Committed: Thu Mar 30 20:15:58 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/PubsubJsonClient.java | 2 +- .../org/apache/beam/sdk/util/PubsubJsonClientTest.java | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ad9df5b5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java index 6bc104f..ef8abfd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java @@ -135,7 +135,7 @@ public class PubsubJsonClient extends PubsubClient { for (OutgoingMessage outgoingMessage : outgoingMessages) { PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); - Map<String, String> attributes = pubsubMessage.getAttributes(); + Map<String, String> attributes = outgoingMessage.attributes; if ((timestampLabel != null || idLabel != null) && attributes == null) { attributes = new TreeMap<>(); } http://git-wip-us.apache.org/repos/asf/beam/blob/ad9df5b5/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java index 17e1870..019190b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java @@ -30,7 +30,10 @@ import com.google.api.services.pubsub.model.ReceivedMessage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; + import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; @@ -114,8 +117,10 @@ public class PubsubJsonClientTest { PubsubMessage expectedPubsubMessage = new PubsubMessage() .encodeData(DATA.getBytes()) .setAttributes( - ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, RECORD_ID)); + ImmutableMap.<String, String> builder() + .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME)) + .put(ID_LABEL, RECORD_ID) + .put("k", "v").build()); PublishRequest expectedRequest = new PublishRequest() .setMessages(ImmutableList.of(expectedPubsubMessage)); PublishResponse expectedResponse = new PublishResponse() @@ -125,8 +130,10 @@ public class PubsubJsonClientTest { .publish(expectedTopic, expectedRequest) .execute())) .thenReturn(expectedResponse); + Map<String, String> attrs = new HashMap<>(); + attrs.put("k", "v"); OutgoingMessage actualMessage = new OutgoingMessage( - DATA.getBytes(), null, MESSAGE_TIME, RECORD_ID); + DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); }
