Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2588#discussion_r173633306 --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubMessage.java --- @@ -0,0 +1,151 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.core; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Map; + +import org.apache.storm.eventhubs.format.SerializeDeserializeUtil; + +import com.microsoft.azure.eventhubs.EventData; + +/** + * Represents a message from EventHub. Encapsulates the actual pay load received + * from EventHub. + * <p> + * It encapsulates the raw bytes from the content, any AMQP application + * properties set, and the system properties (partition key, offset, enqueue + * time, sequence number, and publisher) set on the Eventhub message. + */ +public class EventHubMessage implements Comparable<EventHubMessage> { + private final byte[] content; + private final String partitionId; + private final String partitionKey; + private final String offset; + private final Instant enqueuedTime; + private final long sequenceNumber; + private final String publisher; + private final MessageId messageId; + + private final Map<String, Object> applicationProperties; + private final Map<String, Object> systemProperties; + + public EventHubMessage(EventData eventdata, String partitionId) { + this.partitionId = partitionId; + + if (eventdata.getBytes() != null) { + content = eventdata.getBytes(); + } else if (eventdata.getObject() != null) { + try { + content = SerializeDeserializeUtil.serialize(eventdata.getObject()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Failed to retrieve payload from EventData"); + } + + applicationProperties = eventdata.getProperties(); + EventData.SystemProperties props = eventdata.getSystemProperties(); + systemProperties = props; --- End diff -- Nit: The props variable seems unnecessary, could just assign directly
---