This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3914ad0754d Solace Read connector: data classes and mapper (#31637)
3914ad0754d is described below
commit 3914ad0754dbd4904ddb079044672f6c1df830a0
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Fri Jun 21 05:23:09 2024 +0200
Solace Read connector: data classes and mapper (#31637)
* Add data classes
* Remove obvious tests of AutoValue classes
* Remove @DefaultSchema and @SchemaFieldNumber annotations
---
sdks/java/io/solace/build.gradle | 1 +
.../org/apache/beam/sdk/io/solace/data/Solace.java | 319 +++++++++++++++++++--
2 files changed, 302 insertions(+), 18 deletions(-)
diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index b33b8fb1802..c317b566618 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -36,6 +36,7 @@ dependencies {
implementation library.java.joda_time
implementation library.java.solace
implementation project(":sdks:java:extensions:avro")
+ implementation library.java.vendored_grpc_1_60_1
implementation library.java.avro
permitUnusedDeclared library.java.avro
}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
index 076a16b96ce..79057445a4e 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
@@ -17,26 +17,23 @@
*/
package org.apache.beam.sdk.io.solace.data;
+import com.google.auto.value.AutoValue;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A record to be written to a Solace topic.
- *
- * <p>You need to transform to {@link Solace.Record} to be able to write to
Solace. For that, you
- * can use the {@link Solace.Record.Builder} provided with this class.
- *
- * <p>For instance, to create a record, use the following code:
- *
- * <pre>{@code
- * Solace.Record record = Solace.Record.builder()
- * .setMessageId(messageId)
- * .setSenderTimestamp(timestampMillis)
- * .setPayload(payload)
- * .build();
- * }</pre>
- *
- * Setting the message id and the timestamp is mandatory.
+ * Provides core data models and utilities for working with Solace messages in
the context of Apache
+ * Beam pipelines. This class includes representations for Solace topics,
queues, destinations, and
+ * message records, as well as a utility for converting Solace messages into
Beam-compatible
+ * records.
*/
public class Solace {
-
+ /** Represents a Solace queue. */
public static class Queue {
private final String name;
@@ -52,7 +49,7 @@ public class Solace {
return name;
}
}
-
+ /** Represents a Solace topic. */
public static class Topic {
private final String name;
@@ -68,4 +65,290 @@ public class Solace {
return name;
}
}
+ /** Represents a Solace destination type. */
+ public enum DestinationType {
+ TOPIC,
+ QUEUE,
+ UNKNOWN
+ }
+
+ /** Represents a Solace message destination (either a Topic or a Queue). */
+ @AutoValue
+ public abstract static class Destination {
+ /**
+ * Gets the name of the destination.
+ *
+ * @return The destination name.
+ */
+ public abstract String getName();
+
+ /**
+ * Gets the type of the destination (TOPIC, QUEUE or UNKNOWN).
+ *
+ * @return The destination type.
+ */
+ public abstract DestinationType getType();
+
+ static Builder builder() {
+ return new AutoValue_Solace_Destination.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setName(String name);
+
+ abstract Builder setType(DestinationType type);
+
+ abstract Destination build();
+ }
+ }
+
+ /** Represents a Solace message record with its associated metadata. */
+ @AutoValue
+ public abstract static class Record {
+ /**
+ * Gets the unique identifier of the message, a string for an
application-specific message
+ * identifier.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getApplicationMessageId()}
+ *
+ * @return The message ID, or null if not available.
+ */
+ public abstract @Nullable String getMessageId();
+
+ /**
+ * Gets the payload of the message as a ByteString.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getBytes()}
+ *
+ * @return The message payload.
+ */
+ public abstract ByteString getPayload();
+ /**
+ * Gets the destination (topic or queue) to which the message was sent.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getDestination()}
+ *
+ * @return The destination, or null if not available.
+ */
+ public abstract @Nullable Destination getDestination();
+
+ /**
+ * Gets the message expiration time in milliseconds since the Unix epoch.
+ *
+ * <p>A value of 0 indicates the message does not expire.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getExpiration()}
+ *
+ * @return The expiration timestamp.
+ */
+ public abstract long getExpiration();
+
+ /**
+ * Gets the priority level of the message (0-255, higher is more
important). -1 if not set.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getPriority()}
+ *
+ * @return The message priority.
+ */
+ public abstract int getPriority();
+
+ /**
+ * Indicates whether the message has been redelivered due to a prior
delivery failure.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getRedelivered()}
+ *
+ * @return True if redelivered, false otherwise.
+ */
+ public abstract boolean getRedelivered();
+
+ /**
+ * Gets the destination to which replies to this message should be sent.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getReplyTo()}
+ *
+ * @return The reply-to destination, or null if not specified.
+ */
+ public abstract @Nullable Destination getReplyTo();
+
+ /**
+ * Gets the timestamp (in milliseconds since the Unix epoch) when the
message was received by
+ * the Solace broker.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getReceiveTimestamp()}
+ *
+ * @return The timestamp.
+ */
+ public abstract long getReceiveTimestamp();
+
+ /**
+ * Gets the timestamp (in milliseconds since the Unix epoch) when the
message was sent by the
+ * sender. Can be null if not provided.
+ *
+ * @return The sender timestamp, or null if not available.
+ */
+ public abstract @Nullable Long getSenderTimestamp();
+
+ /**
+ * Gets the sequence number of the message (if applicable).
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getSequenceNumber()}
+ *
+ * @return The sequence number, or null if not available.
+ */
+ public abstract @Nullable Long getSequenceNumber();
+
+ /**
+ * The number of milliseconds before the message is discarded or moved to
Dead Message Queue. A
+ * value of 0 means the message will never expire. The default value is 0.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getTimeToLive()}
+ *
+ * @return The time-to-live value.
+ */
+ public abstract long getTimeToLive();
+
+ /**
+ * Gets the ID for the message within its replication group (if
applicable).
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getReplicationGroupMessageId()}
+ *
+ * <p>The ID for a particular message is only guaranteed to be the same
for a particular copy of
+ * a message on a particular queue or topic endpoint within a replication
group. The same
+ * message on different queues or topic endpoints within the same
replication group may or may
+ * not have the same replication group message ID. See more at <a
+ *
href="https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm">https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm</a>
+ *
+ * @return The replication group message ID, or null if not present.
+ */
+ public abstract @Nullable String getReplicationGroupMessageId();
+ /**
+ * Gets the attachment data of the message as a ByteString, if any. This
might represent files
+ * or other binary content associated with the message.
+ *
+ * <p>Mapped from {@link BytesXMLMessage#getAttachmentByteBuffer()}
+ *
+ * @return The attachment data, or an empty ByteString if no attachment is
present.
+ */
+ public abstract ByteString getAttachmentBytes();
+
+ static Builder builder() {
+ return new AutoValue_Solace_Record.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setMessageId(@Nullable String messageId);
+
+ abstract Builder setPayload(ByteString payload);
+
+ abstract Builder setDestination(@Nullable Destination destination);
+
+ abstract Builder setExpiration(long expiration);
+
+ abstract Builder setPriority(int priority);
+
+ abstract Builder setRedelivered(boolean redelivered);
+
+ abstract Builder setReplyTo(@Nullable Destination replyTo);
+
+ abstract Builder setReceiveTimestamp(long receiveTimestamp);
+
+ abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp);
+
+ abstract Builder setSequenceNumber(@Nullable Long sequenceNumber);
+
+ abstract Builder setTimeToLive(long timeToLive);
+
+ abstract Builder setReplicationGroupMessageId(@Nullable String
replicationGroupMessageId);
+
+ abstract Builder setAttachmentBytes(ByteString attachmentBytes);
+
+ abstract Record build();
+ }
+ }
+ /**
+ * A utility class for mapping {@link BytesXMLMessage} instances to {@link
Solace.Record} objects.
+ * This simplifies the process of converting raw Solace messages into a
format suitable for use
+ * within Apache Beam pipelines.
+ */
+ public static class SolaceRecordMapper {
+ private static final Logger LOG =
LoggerFactory.getLogger(SolaceRecordMapper.class);
+ /**
+ * Maps a {@link BytesXMLMessage} (if not null) to a {@link Solace.Record}.
+ *
+ * <p>Extracts relevant information from the message, including payload,
metadata, and
+ * destination details.
+ *
+ * @param msg The Solace message to map.
+ * @return A Solace Record representing the message, or null if the input
message was null.
+ */
+ public static @Nullable Record map(@Nullable BytesXMLMessage msg) {
+ if (msg == null) {
+ return null;
+ }
+
+ ByteArrayOutputStream payloadBytesStream = new ByteArrayOutputStream();
+ if (msg.getContentLength() != 0) {
+ try {
+ payloadBytesStream.write(msg.getBytes());
+ } catch (IOException e) {
+ LOG.error("Could not write bytes from the BytesXMLMessage to the
Solace.record.", e);
+ }
+ }
+
+ ByteArrayOutputStream attachmentBytesStream = new
ByteArrayOutputStream();
+ if (msg.getAttachmentContentLength() != 0) {
+ try {
+ attachmentBytesStream.write(msg.getAttachmentByteBuffer().array());
+ } catch (IOException e) {
+ LOG.error(
+ "Could not AttachmentByteBuffer from the BytesXMLMessage to the
Solace.record.", e);
+ }
+ }
+
+ Destination replyTo = getDestination(msg.getCorrelationId(),
msg.getReplyTo());
+ Destination destination = getDestination(msg.getCorrelationId(),
msg.getDestination());
+
+ return Record.builder()
+ .setMessageId(msg.getApplicationMessageId())
+ .setPayload(ByteString.copyFrom(payloadBytesStream.toByteArray()))
+ .setDestination(destination)
+ .setExpiration(msg.getExpiration())
+ .setPriority(msg.getPriority())
+ .setRedelivered(msg.getRedelivered())
+ .setReplyTo(replyTo)
+ .setReceiveTimestamp(msg.getReceiveTimestamp())
+ .setSenderTimestamp(msg.getSenderTimestamp())
+ .setSequenceNumber(msg.getSequenceNumber())
+ .setTimeToLive(msg.getTimeToLive())
+ .setReplicationGroupMessageId(
+ msg.getReplicationGroupMessageId() != null
+ ? msg.getReplicationGroupMessageId().toString()
+ : null)
+
.setAttachmentBytes(ByteString.copyFrom(attachmentBytesStream.toByteArray()))
+ .build();
+ }
+
+ private static @Nullable Destination getDestination(
+ String msgId, com.solacesystems.jcsmp.Destination
originalDestinationField) {
+ if (originalDestinationField == null) {
+ return null;
+ }
+ Destination.Builder destinationBuilder =
+ Destination.builder().setName(originalDestinationField.getName());
+ if (originalDestinationField instanceof com.solacesystems.jcsmp.Topic) {
+ destinationBuilder.setType(DestinationType.TOPIC);
+ } else if (originalDestinationField instanceof
com.solacesystems.jcsmp.Queue) {
+ destinationBuilder.setType(DestinationType.QUEUE);
+ } else {
+ LOG.error(
+ "SolaceIO: Unknown destination type type for message {}, setting
to {}",
+ msgId,
+ DestinationType.UNKNOWN.name());
+ destinationBuilder.setType(DestinationType.UNKNOWN);
+ }
+ return destinationBuilder.build();
+ }
+ }
}