Chad Dombrova created BEAM-8085:
-----------------------------------
Summary: PubsubMessageWithAttributesCoder should not produce null
attributes map
Key: BEAM-8085
URL: https://issues.apache.org/jira/browse/BEAM-8085
Project: Beam
Issue Type: Improvement
Components: io-java-gcp
Reporter: Chad Dombrova
Hi, I just got caught by an issue where PubsubMessage.getAttributeMap()
returned null, because the message was created by
PubsubMessageWithAttributesCoder which uses a NullableCoder for attributes.
Here are the relevant code snippets:
{code:java}
public class PubsubMessageWithAttributesCoder extends
CustomCoder<PubsubMessage> {
// A message's payload can not be null
private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
// A message's attributes can be null.
private static final Coder<Map<String, String>> ATTRIBUTES_CODER =
NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@Override
public PubsubMessage decode(InputStream inStream) throws IOException {
return decode(inStream, Context.NESTED);
}
@Override
public PubsubMessage decode(InputStream inStream, Context context) throws
IOException {
byte[] payload = PAYLOAD_CODER.decode(inStream);
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);
return new PubsubMessage(payload, attributes);
}
}
{code}
{code:java}
public class PubsubMessage {
private byte[] message;
private Map<String, String> attributes;
public PubsubMessage(byte[] payload, Map<String, String> attributes) {
this.message = payload;
this.attributes = attributes;
}
/** Returns the main PubSub message. */
public byte[] getPayload() {
return message;
}
/** Returns the given attribute value. If not such attribute exists, returns
null. */
@Nullable
public String getAttribute(String attribute) {
checkNotNull(attribute, "attribute");
return attributes.get(attribute);
}
/** Returns the full map of attributes. This is an unmodifiable map. */
public Map<String, String> getAttributeMap() {
return attributes;
}
}
{code}
There are a handful of potential solutions:
# Remove the NullableCoder
# In PubsubMessageWithAttributesCoder.decode, check for null and create an
empty Map before instantiating PubsubMessage
# Allow attributes to be null for PubsubMessage constructor, but create an
empty Map if it is (similar to above, but handle it in PubsubMessage)
# Allow PubsubMessage.attributes to be nullable, and indicate it as such
--
This message was sent by Atlassian Jira
(v8.3.2#803003)