This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3914ad0754d Solace Read connector: data classes and mapper (#31637)
3914ad0754d is described below

commit 3914ad0754dbd4904ddb079044672f6c1df830a0
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Fri Jun 21 05:23:09 2024 +0200

    Solace Read connector: data classes and mapper (#31637)
    
    * Add data classes
    
    * Remove obvious tests of AutoValue classes
    
    * Remove @DefaultSchema and @SchemaFieldNumber annotations
---
 sdks/java/io/solace/build.gradle                   |   1 +
 .../org/apache/beam/sdk/io/solace/data/Solace.java | 319 +++++++++++++++++++--
 2 files changed, 302 insertions(+), 18 deletions(-)

diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index b33b8fb1802..c317b566618 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -36,6 +36,7 @@ dependencies {
     implementation library.java.joda_time
     implementation library.java.solace
     implementation project(":sdks:java:extensions:avro")
+    implementation library.java.vendored_grpc_1_60_1
     implementation library.java.avro
     permitUnusedDeclared library.java.avro
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
index 076a16b96ce..79057445a4e 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
@@ -17,26 +17,23 @@
  */
 package org.apache.beam.sdk.io.solace.data;
 
+import com.google.auto.value.AutoValue;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A record to be written to a Solace topic.
- *
- * <p>You need to transform to {@link Solace.Record} to be able to write to 
Solace. For that, you
- * can use the {@link Solace.Record.Builder} provided with this class.
- *
- * <p>For instance, to create a record, use the following code:
- *
- * <pre>{@code
- * Solace.Record record = Solace.Record.builder()
- *         .setMessageId(messageId)
- *         .setSenderTimestamp(timestampMillis)
- *         .setPayload(payload)
- *         .build();
- * }</pre>
- *
- * Setting the message id and the timestamp is mandatory.
+ * Provides core data models and utilities for working with Solace messages in 
the context of Apache
+ * Beam pipelines. This class includes representations for Solace topics, 
queues, destinations, and
+ * message records, as well as a utility for converting Solace messages into 
Beam-compatible
+ * records.
  */
 public class Solace {
-
+  /** Represents a Solace queue. */
   public static class Queue {
     private final String name;
 
@@ -52,7 +49,7 @@ public class Solace {
       return name;
     }
   }
-
+  /** Represents a Solace topic. */
   public static class Topic {
     private final String name;
 
@@ -68,4 +65,290 @@ public class Solace {
       return name;
     }
   }
+  /** Represents a Solace destination type. */
+  public enum DestinationType {
+    TOPIC,
+    QUEUE,
+    UNKNOWN
+  }
+
+  /** Represents a Solace message destination (either a Topic or a Queue). */
+  @AutoValue
+  public abstract static class Destination {
+    /**
+     * Gets the name of the destination.
+     *
+     * @return The destination name.
+     */
+    public abstract String getName();
+
+    /**
+     * Gets the type of the destination (TOPIC, QUEUE or UNKNOWN).
+     *
+     * @return The destination type.
+     */
+    public abstract DestinationType getType();
+
+    static Builder builder() {
+      return new AutoValue_Solace_Destination.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setName(String name);
+
+      abstract Builder setType(DestinationType type);
+
+      abstract Destination build();
+    }
+  }
+
+  /** Represents a Solace message record with its associated metadata. */
+  @AutoValue
+  public abstract static class Record {
+    /**
+     * Gets the unique identifier of the message, a string for an 
application-specific message
+     * identifier.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getApplicationMessageId()}
+     *
+     * @return The message ID, or null if not available.
+     */
+    public abstract @Nullable String getMessageId();
+
+    /**
+     * Gets the payload of the message as a ByteString.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getBytes()}
+     *
+     * @return The message payload.
+     */
+    public abstract ByteString getPayload();
+    /**
+     * Gets the destination (topic or queue) to which the message was sent.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getDestination()}
+     *
+     * @return The destination, or null if not available.
+     */
+    public abstract @Nullable Destination getDestination();
+
+    /**
+     * Gets the message expiration time in milliseconds since the Unix epoch.
+     *
+     * <p>A value of 0 indicates the message does not expire.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getExpiration()}
+     *
+     * @return The expiration timestamp.
+     */
+    public abstract long getExpiration();
+
+    /**
+     * Gets the priority level of the message (0-255, higher is more 
important). -1 if not set.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getPriority()}
+     *
+     * @return The message priority.
+     */
+    public abstract int getPriority();
+
+    /**
+     * Indicates whether the message has been redelivered due to a prior 
delivery failure.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getRedelivered()}
+     *
+     * @return True if redelivered, false otherwise.
+     */
+    public abstract boolean getRedelivered();
+
+    /**
+     * Gets the destination to which replies to this message should be sent.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getReplyTo()}
+     *
+     * @return The reply-to destination, or null if not specified.
+     */
+    public abstract @Nullable Destination getReplyTo();
+
+    /**
+     * Gets the timestamp (in milliseconds since the Unix epoch) when the 
message was received by
+     * the Solace broker.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getReceiveTimestamp()}
+     *
+     * @return The timestamp.
+     */
+    public abstract long getReceiveTimestamp();
+
+    /**
+     * Gets the timestamp (in milliseconds since the Unix epoch) when the 
message was sent by the
+     * sender. Can be null if not provided.
+     *
+     * @return The sender timestamp, or null if not available.
+     */
+    public abstract @Nullable Long getSenderTimestamp();
+
+    /**
+     * Gets the sequence number of the message (if applicable).
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getSequenceNumber()}
+     *
+     * @return The sequence number, or null if not available.
+     */
+    public abstract @Nullable Long getSequenceNumber();
+
+    /**
+     * The number of milliseconds before the message is discarded or moved to 
Dead Message Queue. A
+     * value of 0 means the message will never expire. The default value is 0.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getTimeToLive()}
+     *
+     * @return The time-to-live value.
+     */
+    public abstract long getTimeToLive();
+
+    /**
+     * Gets the ID for the message within its replication group (if 
applicable).
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getReplicationGroupMessageId()}
+     *
+     * <p>The ID for a particular message is only guaranteed to be the same 
for a particular copy of
+     * a message on a particular queue or topic endpoint within a replication 
group. The same
+     * message on different queues or topic endpoints within the same 
replication group may or may
+     * not have the same replication group message ID. See more at <a
+     * 
href="https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm";>https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm</a>
+     *
+     * @return The replication group message ID, or null if not present.
+     */
+    public abstract @Nullable String getReplicationGroupMessageId();
+    /**
+     * Gets the attachment data of the message as a ByteString, if any. This 
might represent files
+     * or other binary content associated with the message.
+     *
+     * <p>Mapped from {@link BytesXMLMessage#getAttachmentByteBuffer()}
+     *
+     * @return The attachment data, or an empty ByteString if no attachment is 
present.
+     */
+    public abstract ByteString getAttachmentBytes();
+
+    static Builder builder() {
+      return new AutoValue_Solace_Record.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setMessageId(@Nullable String messageId);
+
+      abstract Builder setPayload(ByteString payload);
+
+      abstract Builder setDestination(@Nullable Destination destination);
+
+      abstract Builder setExpiration(long expiration);
+
+      abstract Builder setPriority(int priority);
+
+      abstract Builder setRedelivered(boolean redelivered);
+
+      abstract Builder setReplyTo(@Nullable Destination replyTo);
+
+      abstract Builder setReceiveTimestamp(long receiveTimestamp);
+
+      abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp);
+
+      abstract Builder setSequenceNumber(@Nullable Long sequenceNumber);
+
+      abstract Builder setTimeToLive(long timeToLive);
+
+      abstract Builder setReplicationGroupMessageId(@Nullable String 
replicationGroupMessageId);
+
+      abstract Builder setAttachmentBytes(ByteString attachmentBytes);
+
+      abstract Record build();
+    }
+  }
+  /**
+   * A utility class for mapping {@link BytesXMLMessage} instances to {@link 
Solace.Record} objects.
+   * This simplifies the process of converting raw Solace messages into a 
format suitable for use
+   * within Apache Beam pipelines.
+   */
+  public static class SolaceRecordMapper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SolaceRecordMapper.class);
+    /**
+     * Maps a {@link BytesXMLMessage} (if not null) to a {@link Solace.Record}.
+     *
+     * <p>Extracts relevant information from the message, including payload, 
metadata, and
+     * destination details.
+     *
+     * @param msg The Solace message to map.
+     * @return A Solace Record representing the message, or null if the input 
message was null.
+     */
+    public static @Nullable Record map(@Nullable BytesXMLMessage msg) {
+      if (msg == null) {
+        return null;
+      }
+
+      ByteArrayOutputStream payloadBytesStream = new ByteArrayOutputStream();
+      if (msg.getContentLength() != 0) {
+        try {
+          payloadBytesStream.write(msg.getBytes());
+        } catch (IOException e) {
+          LOG.error("Could not write bytes from the BytesXMLMessage to the 
Solace.record.", e);
+        }
+      }
+
+      ByteArrayOutputStream attachmentBytesStream = new 
ByteArrayOutputStream();
+      if (msg.getAttachmentContentLength() != 0) {
+        try {
+          attachmentBytesStream.write(msg.getAttachmentByteBuffer().array());
+        } catch (IOException e) {
+          LOG.error(
+              "Could not AttachmentByteBuffer from the BytesXMLMessage to the 
Solace.record.", e);
+        }
+      }
+
+      Destination replyTo = getDestination(msg.getCorrelationId(), 
msg.getReplyTo());
+      Destination destination = getDestination(msg.getCorrelationId(), 
msg.getDestination());
+
+      return Record.builder()
+          .setMessageId(msg.getApplicationMessageId())
+          .setPayload(ByteString.copyFrom(payloadBytesStream.toByteArray()))
+          .setDestination(destination)
+          .setExpiration(msg.getExpiration())
+          .setPriority(msg.getPriority())
+          .setRedelivered(msg.getRedelivered())
+          .setReplyTo(replyTo)
+          .setReceiveTimestamp(msg.getReceiveTimestamp())
+          .setSenderTimestamp(msg.getSenderTimestamp())
+          .setSequenceNumber(msg.getSequenceNumber())
+          .setTimeToLive(msg.getTimeToLive())
+          .setReplicationGroupMessageId(
+              msg.getReplicationGroupMessageId() != null
+                  ? msg.getReplicationGroupMessageId().toString()
+                  : null)
+          
.setAttachmentBytes(ByteString.copyFrom(attachmentBytesStream.toByteArray()))
+          .build();
+    }
+
+    private static @Nullable Destination getDestination(
+        String msgId, com.solacesystems.jcsmp.Destination 
originalDestinationField) {
+      if (originalDestinationField == null) {
+        return null;
+      }
+      Destination.Builder destinationBuilder =
+          Destination.builder().setName(originalDestinationField.getName());
+      if (originalDestinationField instanceof com.solacesystems.jcsmp.Topic) {
+        destinationBuilder.setType(DestinationType.TOPIC);
+      } else if (originalDestinationField instanceof 
com.solacesystems.jcsmp.Queue) {
+        destinationBuilder.setType(DestinationType.QUEUE);
+      } else {
+        LOG.error(
+            "SolaceIO: Unknown destination type type for message {}, setting 
to {}",
+            msgId,
+            DestinationType.UNKNOWN.name());
+        destinationBuilder.setType(DestinationType.UNKNOWN);
+      }
+      return destinationBuilder.build();
+    }
+  }
 }

Reply via email to