This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 18af8c837fa Solace Read connector (#31476)
18af8c837fa is described below

commit 18af8c837faf63aaee66f7071aef63835a9b9dc0
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Fri Jun 21 21:47:56 2024 +0200

    Solace Read connector (#31476)
    
    * wip solace connector
    
    * wip solace connector
    
    * some checker errors resolved
    
    * all checker errors resolved
    
    * improving unit tests
    
    * respond to pr commments
    
    * Documentation
    
    * Small refactor - move data classes out of the client
    
    * refactor
    
    * Add github action for integration test of Solace
    
    * testing github workflow
    
    * bump testcontainers to 1.19.7 - soalce semp was updated with an admin 
user access
    
    * Use FlowHandle to acknowledge messages to make SolaceCheckpointMark's 
fields serializable.
    
    * Handle StaleSessionException error
    
    * Add @Internal annotation to mark the SolaceIO API beta and subject to 
change.
    
    * Improve documentation
    
    * Back to ack based on bytesxmlmessages. Deduplicate default to false.
    
    * update changes.md with Solace read connector
    
    * remove ack by id code
    
    * remove todo comment
    
    * Add licenses to package-info.java files
    
    * Restructure documentation
    
    * update aws test after upgrading testcontainers version.
    
    * Disable publishing docs until the first pass on the master branch
    
    * Remove files from this branch to split PR into smaller chunks
    
    * refactor tests for readability
    
    * revert upgrade of testcontainers - not needed in this PR chunk
    
    * revert upgrade of testcontainers - not needed in this PR chunk
    
    * spotless
    
    * remove IT tests from this pr
    
    * Tech Writer review
    
    * Add a field to Solace.Record mapped from 
BytesXMLMessage.getAttachmentByteBuffer()
    
    * Add and fix some documentation
    
    * Remove CheckpointMark's reference to the UnboundedSolaceReader - 
unnecessary.
    
    * Revert "Remove CheckpointMark's reference to the UnboundedSolaceReader - 
unnecessary."
    
    This reverts commit 2e1c10e0b4c0f124af779ee4f284fcc79ccc8fc9.
    
    * Solace project init - github workflow file, gradle module
    
    * Splitting the #31476 - Leaving only PTransform AutoValue configurations
    
    * remove unnecessary dependencies
    
    * remove info from CHANGES.md
    
    * Add watermark-related code
    
    * Remove excessive @Nullable annotations on Solace.Record class
    
    * Remove entry from CHANGES.md
    
    * Make Record builder package-private
    
    * Refactor SolaceIO - the constructor of Read takes a configuration builder 
as an argument
    
    * Change payload and attachments type to immutable ByteString
    
    * Downgrade Record builders access modifiers to package private
    
    * Add documentation
    
    * Add documentation to classes and methods in Solace.java
    
    * typo
    
    * Add SolaceCheckpointMark.java
    
    * Make SolaceCheckpointMark visible for testing
    
    * Remove SolaceRecordCoder and take advantage of @DefaultSchema
---
 sdks/java/io/solace/build.gradle                   |   8 +
 .../org/apache/beam/sdk/io/solace/SolaceIO.java    | 415 ++++++++++--
 .../io/solace/broker/SessionServiceFactory.java    |  19 +
 .../org/apache/beam/sdk/io/solace/data/Solace.java |   4 +
 .../sdk/io/solace/MockEmptySessionService.java}    |  37 +-
 .../apache/beam/sdk/io/solace/MockSempClient.java  |  87 +++
 .../beam/sdk/io/solace/MockSempClientFactory.java} |  29 +-
 .../beam/sdk/io/solace/MockSessionService.java     |  88 +++
 .../sdk/io/solace/MockSessionServiceFactory.java}  |  29 +-
 .../apache/beam/sdk/io/solace/SolaceIOTest.java    | 597 +++++++++++++++++
 .../beam/sdk/io/solace/data/SolaceDataUtils.java   | 708 +++++++++++++++++++++
 11 files changed, 1945 insertions(+), 76 deletions(-)

diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index c317b566618..506145f3529 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -35,8 +35,16 @@ dependencies {
     implementation library.java.slf4j_api
     implementation library.java.joda_time
     implementation library.java.solace
+    implementation library.java.vendored_guava_32_1_2_jre
     implementation project(":sdks:java:extensions:avro")
     implementation library.java.vendored_grpc_1_60_1
     implementation library.java.avro
     permitUnusedDeclared library.java.avro
+    implementation library.java.vendored_grpc_1_60_1
+
+    testImplementation library.java.junit
+    testImplementation project(path: ":sdks:java:io:common", configuration: 
"testRuntimeMigration")
+    testImplementation project(path: ":sdks:java:testing:test-utils", 
configuration: "testRuntimeMigration")
+    testRuntimeOnly library.java.slf4j_jdk14
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
index ca8cd615ac6..e6b0dd34b18 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
@@ -17,26 +17,196 @@
  */
 package org.apache.beam.sdk.io.solace;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Destination;
 import com.solacesystems.jcsmp.JCSMPFactory;
 import com.solacesystems.jcsmp.Queue;
 import com.solacesystems.jcsmp.Topic;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
 import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
 import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper;
+import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * A {@link PTransform} to read and write from/to <a 
href="https://solace.com/";>Solace</a> event
+ * broker.
+ *
+ * <p>Note: Internal use only; this API is beta and subject to change.
+ *
+ * <h2>Reading from Solace</h2>
+ *
+ * To read from Solace, use the {@link SolaceIO#read()} or {@link 
SolaceIO#read(TypeDescriptor,
+ * SerializableFunction, SerializableFunction)}.
+ *
+ * <h3>No-argument {@link SolaceIO#read()} top-level method</h3>
+ *
+ * <p>This method returns a PCollection of {@link Solace.Record} objects. It 
uses a default mapper
+ * ({@link SolaceRecordMapper#map(BytesXMLMessage)}) to map from the received 
{@link
+ * BytesXMLMessage} from Solace, to the {@link Solace.Record} objects.
+ *
+ * <p>By default, it also uses a {@link BytesXMLMessage#getSenderTimestamp()} 
for watermark
+ * estimation. This {@link SerializableFunction} can be overridden with {@link
+ * Read#withTimestampFn(SerializableFunction)} method.
+ *
+ * <p>When using this method, the Coders are inferred automatically.
+ *
+ * <h3>Advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction, 
SerializableFunction)}
+ * top-level method</h3>
+ *
+ * <p>With this method, the user can:
+ *
+ * <ul>
+ *   <li>specify a custom output type for the PTransform (for example their 
own class consisting
+ *       only of the relevant fields, optimized for their use-case), and
+ *   <li>create a custom mapping between {@link BytesXMLMessage} and their 
output type and
+ *   <li>specify what field to use for watermark estimation from their mapped 
field (for example, in
+ *       this method the user can use a field which is encoded in the payload 
as a timestamp, which
+ *       cannot be done with the {@link SolaceIO#read()} method.
+ * </ul>
+ *
+ * <h3>Reading from a queue ({@link Read#from(Solace.Queue)}} or a topic 
({@link
+ * Read#from(Solace.Topic)})</h3>
+ *
+ * <p>Regardless of the top-level read method choice, the user can specify 
whether to read from a
+ * Queue - {@link Read#from(Solace.Queue)}, or a Topic {@link 
Read#from(Solace.Topic)}.
+ *
+ * <p>Note: when a user specifies to read from a Topic, the connector will 
create a matching Queue
+ * and a Subscription. The user must ensure that the SEMP API is reachable 
from the driver program
+ * and must provide credentials that have `write` permission to the <a
+ * href="https://docs.solace.com/Admin/SEMP/Using-SEMP.htm";>SEMP Config 
API</a>. The created Queue
+ * will be non-exclusive. The Queue will not be deleted when the pipeline is 
terminated.
+ *
+ * <p>Note: If the user specifies to read from a Queue, <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#overview";>the 
driver program</a>
+ * will execute a call to the SEMP API to check if the Queue is `exclusive` or 
`non-exclusive`. The
+ * user must ensure that the SEMP API is reachable from the driver program and 
provide credentials
+ * with `read` permission to the {@link 
Read#withSempClientFactory(SempClientFactory)}.
+ *
+ * <h3>Usage example</h3>
+ *
+ * <h4>The no-arg {@link SolaceIO#read()} method</h4>
+ *
+ * <p>The minimal example - reading from an existing Queue, using the no-arg 
{@link SolaceIO#read()}
+ * method, with all the default configuration options.
+ *
+ * <pre>{@code
+ * PCollection<Solace.Record> events =
+ *   pipeline.apply(
+ *     SolaceIO.read()
+ *         .from(Queue.fromName("your-queue-name"))
+ *         .withSempClientFactory(
+ *                 BasicAuthSempClientFactory.builder()
+ *                         .host("your-host-name-with-protocol") // e.g. 
"http://12.34.56.78:8080";
+ *                         .username("semp-username")
+ *                         .password("semp-password")
+ *                         .vpnName("vpn-name")
+ *                         .build())
+ *         .withSessionServiceFactory(
+ *                 BasicAuthJcsmpSessionServiceFactory.builder()
+ *                         .host("your-host-name")
+ *                               // e.g. "12.34.56.78", or "[fe80::1]", or 
"12.34.56.78:4444"
+ *                         .username("username")
+ *                         .password("password")
+ *                         .vpnName("vpn-name")
+ *                         .build()));
+ * }</pre>
+ *
+ * <h4>The advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction,
+ * SerializableFunction)} method</h4>
+ *
+ * <p>When using this method you can specify a custom output PCollection type 
and a custom timestamp
+ * function.
+ *
+ * <pre>{@code
+ * @DefaultSchema(JavaBeanSchema.class)
+ * public static class SimpleRecord {
+ *    public String payload;
+ *    public String messageId;
+ *    public Instant timestamp;
+ *
+ *    public SimpleRecord() {}
+ *
+ *    public SimpleRecord(String payload, String messageId, Instant timestamp) 
{
+ *        this.payload = payload;
+ *        this.messageId = messageId;
+ *        this.timestamp = timestamp;
+ *    }
+ * }
+ *
+ * private static SimpleRecord toSimpleRecord(BytesXMLMessage record) {
+ *    if (record == null) {
+ *        return null;
+ *    }
+ *    return new SimpleRecord(
+ *            new String(record.getBytes(), StandardCharsets.UTF_8),
+ *            record.getApplicationMessageId(),
+ *            record.getSenderTimestamp() != null
+ *                    ? Instant.ofEpochMilli(record.getSenderTimestamp())
+ *                    : Instant.now());
+ * }
+ *
+ * PCollection<SimpleRecord> events =
+ *  pipeline.apply(
+ *      SolaceIO.read(
+ *                      TypeDescriptor.of(SimpleRecord.class),
+ *                      record -> toSimpleRecord(record),
+ *                      record -> record.timestamp)
+ *              .from(Topic.fromName("your-topic-name"))
+ *              .withSempClientFactory(...)
+ *              .withSessionServiceFactory(...);
+ *
+ *
+ * }</pre>
+ *
+ * <h3>Authentication</h3>
+ *
+ * <p>When reading from Solace, the user must use {@link
+ * Read#withSessionServiceFactory(SessionServiceFactory)} to create a JCSMP 
session and {@link
+ * Read#withSempClientFactory(SempClientFactory)} to authenticate to the SEMP 
API.
+ *
+ * <p>See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for 
session authentication.
+ * The connector provides implementation of the {@link SessionServiceFactory} 
using the Basic
+ * Authentication: {@link 
org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}.
+ *
+ * <p>For the authentication to the SEMP API ({@link 
Read#withSempClientFactory(SempClientFactory)})
+ * the connector provides {@link 
org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
+ * authenticate using the Basic Authentication.
+ */
+@Internal
 public class SolaceIO {
 
+  public static final SerializableFunction<Solace.Record, Instant> 
SENDER_TIMESTAMP_FUNCTION =
+      (record) -> {
+        Long senderTimestamp = record != null ? record.getSenderTimestamp() : 
null;
+        if (senderTimestamp != null) {
+          return Instant.ofEpochMilli(senderTimestamp);
+        } else {
+          return Instant.now();
+        }
+      };
   private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
 
   /** Get a {@link Topic} object from the topic name. */
@@ -49,17 +219,84 @@ public class SolaceIO {
     return JCSMPFactory.onlyInstance().createQueue(queueName);
   }
 
-  @AutoValue
-  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+  /**
+   * Convert to a JCSMP destination from a schema-enabled {@link
+   * org.apache.beam.sdk.io.solace.data.Solace.Destination}.
+   *
+   * <p>This method returns a {@link Destination}, which may be either a 
{@link Topic} or a {@link
+   * Queue}
+   */
+  public static Destination convertToJcsmpDestination(Solace.Destination 
destination) {
+    if (destination.getType().equals(Solace.DestinationType.TOPIC)) {
+      return topicFromName(checkNotNull(destination.getName()));
+    } else if (destination.getType().equals(Solace.DestinationType.QUEUE)) {
+      return queueFromName(checkNotNull(destination.getName()));
+    } else {
+      throw new IllegalArgumentException(
+          "SolaceIO.Write: Unknown destination type: " + 
destination.getType());
+    }
+  }
+
+  /**
+   * Create a {@link Read} transform, to read from Solace. The ingested 
records will be mapped to
+   * the {@link Solace.Record} objects.
+   */
+  public static Read<Solace.Record> read() {
+    return new Read<Solace.Record>(
+        Read.Configuration.<Solace.Record>builder()
+            .setTypeDescriptor(TypeDescriptor.of(Solace.Record.class))
+            .setParseFn(SolaceRecordMapper::map)
+            .setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
+            .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS));
+  }
+  /**
+   * Create a {@link Read} transform, to read from Solace. Specify a {@link 
SerializableFunction} to
+   * map incoming {@link BytesXMLMessage} records, to the object of your 
choice. You also need to
+   * specify a {@link TypeDescriptor} for your class and the timestamp 
function which returns an
+   * {@link Instant} from the record.
+   *
+   * <p>The type descriptor will be used to infer a coder from CoderRegistry 
or Schema Registry. You
+   * can initialize a new TypeDescriptor in the following manner:
+   *
+   * <pre>{@code
+   * TypeDescriptor<T> typeDescriptor = 
TypeDescriptor.of(YourOutputType.class);
+   * }</pre>
+   */
+  public static <T> Read<T> read(
+      TypeDescriptor<T> typeDescriptor,
+      SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn,
+      SerializableFunction<T, Instant> timestampFn) {
+    checkState(typeDescriptor != null, "SolaceIO.Read: typeDescriptor must not 
be null");
+    checkState(parseFn != null, "SolaceIO.Read: parseFn must not be null");
+    checkState(timestampFn != null, "SolaceIO.Read: timestampFn must not be 
null");
+    return new Read<T>(
+        Read.Configuration.<T>builder()
+            .setTypeDescriptor(typeDescriptor)
+            .setParseFn(parseFn)
+            .setTimestampFn(timestampFn)
+            .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS));
+  }
+
+  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    @VisibleForTesting final Configuration.Builder<T> configurationBuilder;
+
+    public Read(Configuration.Builder<T> configurationBuilder) {
+      this.configurationBuilder = configurationBuilder;
+    }
 
     /** Set the queue name to read from. Use this or the `from(Topic)` method. 
*/
     public Read<T> from(Solace.Queue queue) {
-      return toBuilder().setQueue(queueFromName(queue.getName())).build();
+      configurationBuilder.setQueue(queueFromName(queue.getName()));
+      return this;
     }
 
     /** Set the topic name to read from. Use this or the `from(Queue)` method. 
*/
     public Read<T> from(Solace.Topic topic) {
-      return toBuilder().setTopic(topicFromName(topic.getName())).build();
+      configurationBuilder.setTopic(topicFromName(topic.getName()));
+      return this;
     }
 
     /**
@@ -74,9 +311,10 @@ public class SolaceIO {
     public Read<T> withTimestampFn(SerializableFunction<T, Instant> 
timestampFn) {
       checkState(
           timestampFn != null,
-          "SolaceIO.Read: timestamp function must be set or use the"
-              + " `Read.readSolaceRecords()` method");
-      return toBuilder().setTimestampFn(timestampFn).build();
+          "SolaceIO.Read: timestampFn must not be null. This function must be 
set or "
+              + "use the no-argument `Read.read()` method");
+      configurationBuilder.setTimestampFn(timestampFn);
+      return this;
     }
 
     /**
@@ -87,7 +325,8 @@ public class SolaceIO {
      * `desiredNumberOfSplits` is set by the runner.
      */
     public Read<T> withMaxNumConnections(Integer maxNumConnections) {
-      return toBuilder().setMaxNumConnections(maxNumConnections).build();
+      configurationBuilder.setMaxNumConnections(maxNumConnections);
+      return this;
     }
 
     /**
@@ -97,7 +336,8 @@ public class SolaceIO {
      * which is always set by Solace.
      */
     public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
-      return toBuilder().setDeduplicateRecords(deduplicateRecords).build();
+      configurationBuilder.setDeduplicateRecords(deduplicateRecords);
+      return this;
     }
 
     /**
@@ -134,7 +374,8 @@ public class SolaceIO {
      */
     public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
       checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory 
must not be null.");
-      return toBuilder().setSempClientFactory(sempClientFactory).build();
+      configurationBuilder.setSempClientFactory(sempClientFactory);
+      return this;
     }
 
     /**
@@ -175,63 +416,157 @@ public class SolaceIO {
     public Read<T> withSessionServiceFactory(SessionServiceFactory 
sessionServiceFactory) {
       checkState(
           sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory 
must not be null.");
-      return 
toBuilder().setSessionServiceFactory(sessionServiceFactory).build();
+      configurationBuilder.setSessionServiceFactory(sessionServiceFactory);
+      return this;
     }
 
-    abstract @Nullable Queue getQueue();
+    @AutoValue
+    abstract static class Configuration<T> {
 
-    abstract @Nullable Topic getTopic();
+      abstract @Nullable Queue getQueue();
 
-    abstract @Nullable SerializableFunction<T, Instant> getTimestampFn();
+      abstract @Nullable Topic getTopic();
 
-    abstract @Nullable Integer getMaxNumConnections();
+      abstract SerializableFunction<T, Instant> getTimestampFn();
 
-    abstract boolean getDeduplicateRecords();
+      abstract @Nullable Integer getMaxNumConnections();
 
-    abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> 
getParseFn();
+      abstract boolean getDeduplicateRecords();
 
-    abstract @Nullable SempClientFactory getSempClientFactory();
+      abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> 
getParseFn();
 
-    abstract @Nullable SessionServiceFactory getSessionServiceFactory();
+      abstract SempClientFactory getSempClientFactory();
 
-    abstract TypeDescriptor<T> getTypeDescriptor();
+      abstract SessionServiceFactory getSessionServiceFactory();
 
-    public static <T> Builder<T> builder() {
-      Builder<T> builder = new 
org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder<T>();
-      builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS);
-      return builder;
-    }
+      abstract TypeDescriptor<T> getTypeDescriptor();
 
-    abstract Builder<T> toBuilder();
+      public static <T> Builder<T> builder() {
+        Builder<T> builder =
+            new 
org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder<T>();
+        return builder;
+      }
 
-    @AutoValue.Builder
-    public abstract static class Builder<T> {
+      @AutoValue.Builder
+      public abstract static class Builder<T> {
 
-      abstract Builder<T> setQueue(Queue queue);
+        abstract Builder<T> setQueue(Queue queue);
 
-      abstract Builder<T> setTopic(Topic topic);
+        abstract Builder<T> setTopic(Topic topic);
 
-      abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> 
timestampFn);
+        abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> 
timestampFn);
 
-      abstract Builder<T> setMaxNumConnections(Integer maxNumConnections);
+        abstract Builder<T> setMaxNumConnections(Integer maxNumConnections);
 
-      abstract Builder<T> setDeduplicateRecords(boolean deduplicateRecords);
+        abstract Builder<T> setDeduplicateRecords(boolean deduplicateRecords);
 
-      abstract Builder<T> setParseFn(
-          SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> 
parseFn);
+        abstract Builder<T> setParseFn(
+            SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> 
parseFn);
 
-      abstract Builder<T> setSempClientFactory(SempClientFactory 
brokerServiceFactory);
+        abstract Builder<T> setSempClientFactory(SempClientFactory 
brokerServiceFactory);
 
-      abstract Builder<T> setSessionServiceFactory(SessionServiceFactory 
sessionServiceFactory);
+        abstract Builder<T> setSessionServiceFactory(SessionServiceFactory 
sessionServiceFactory);
 
-      abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor);
+        abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> 
typeDescriptor);
 
-      abstract Read<T> build();
+        abstract Configuration<T> build();
+      }
+    }
+
+    @Override
+    public void validate(@Nullable PipelineOptions options) {
+      Configuration<T> configuration = configurationBuilder.build();
+      checkState(
+          (configuration.getQueue() == null ^ configuration.getTopic() == 
null),
+          "SolaceIO.Read: One of the Solace {Queue, Topic} must be set.");
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
-      throw new UnsupportedOperationException("");
+      Configuration<T> configuration = configurationBuilder.build();
+      SempClientFactory sempClientFactory = 
configuration.getSempClientFactory();
+      String jobName = input.getPipeline().getOptions().getJobName();
+      Queue initializedQueue =
+          initializeQueueForTopicIfNeeded(
+              configuration.getQueue(), configuration.getTopic(), jobName, 
sempClientFactory);
+
+      SessionServiceFactory sessionServiceFactory = 
configuration.getSessionServiceFactory();
+      sessionServiceFactory.setQueue(initializedQueue);
+
+      Coder<T> coder = inferCoder(input.getPipeline(), 
configuration.getTypeDescriptor());
+
+      return input.apply(
+          org.apache.beam.sdk.io.Read.from(
+              new UnboundedSolaceSource<>(
+                  initializedQueue,
+                  sempClientFactory,
+                  sessionServiceFactory,
+                  configuration.getMaxNumConnections(),
+                  configuration.getDeduplicateRecords(),
+                  coder,
+                  configuration.getTimestampFn(),
+                  configuration.getParseFn())));
+    }
+
+    @VisibleForTesting
+    Coder<T> inferCoder(Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
+      Coder<T> coderFromCoderRegistry = getFromCoderRegistry(pipeline, 
typeDescriptor);
+      if (coderFromCoderRegistry != null) {
+        return coderFromCoderRegistry;
+      }
+
+      Coder<T> coderFromSchemaRegistry = getFromSchemaRegistry(pipeline, 
typeDescriptor);
+      if (coderFromSchemaRegistry != null) {
+        return coderFromSchemaRegistry;
+      }
+
+      throw new RuntimeException(
+          "SolaceIO.Read: Cannot infer a coder for the TypeDescriptor. 
Annotate your"
+              + " output class with @DefaultSchema annotation or create a 
coder manually"
+              + " and register it in the CoderRegistry.");
+    }
+
+    private @Nullable Coder<T> getFromSchemaRegistry(
+        Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
+      try {
+        return pipeline.getSchemaRegistry().getSchemaCoder(typeDescriptor);
+      } catch (NoSuchSchemaException e) {
+        return null;
+      }
+    }
+
+    private @Nullable Coder<T> getFromCoderRegistry(
+        Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
+      try {
+        return pipeline.getCoderRegistry().getCoder(typeDescriptor);
+      } catch (CannotProvideCoderException e) {
+        return null;
+      }
+    }
+
+    private Queue initializeQueueForTopicIfNeeded(
+        @Nullable Queue queue,
+        @Nullable Topic topic,
+        String jobName,
+        SempClientFactory sempClientFactory) {
+      Queue initializedQueue;
+      if (queue != null) {
+        return queue;
+      } else {
+        String queueName = String.format("queue-%s-%s", topic, jobName);
+        try {
+          String topicName = checkNotNull(topic).getName();
+          initializedQueue = 
sempClientFactory.create().createQueueForTopic(queueName, topicName);
+          LOG.warn(
+              "SolaceIO.Read: A new queue {} was created. The Queue will not 
be"
+                  + " deleted when this job finishes. Make sure to remove it 
yourself"
+                  + " when not needed.",
+              initializedQueue.getName());
+          return initializedQueue;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
     }
   }
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 9b4ef99eba7..7d1dee7a118 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.io.solace.broker;
 
+import com.solacesystems.jcsmp.Queue;
 import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * This abstract class serves as a blueprint for creating `SessionService` 
objects. It introduces a
@@ -25,9 +27,26 @@ import java.io.Serializable;
  */
 public abstract class SessionServiceFactory implements Serializable {
 
+  /**
+   * A reference to a Queue object. This is set when the pipline is 
constructed (in the {@link
+   * 
org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)}
 method).
+   * This could be used to associate the created SessionService with a 
specific queue for message
+   * handling.
+   */
+  @Nullable Queue queue;
+
   /**
    * This is the core method that subclasses must implement. It defines how to 
construct and return
    * a SessionService object.
    */
   public abstract SessionService create();
+
+  /**
+   * This method is called in the {@link
+   * 
org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)}
 method
+   * to set the Queue reference.
+   */
+  public void setQueue(Queue queue) {
+    this.queue = queue;
+  }
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
index 79057445a4e..97d688bff23 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
@@ -21,6 +21,8 @@ import com.google.auto.value.AutoValue;
 import com.solacesystems.jcsmp.BytesXMLMessage;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -74,6 +76,7 @@ public class Solace {
 
   /** Represents a Solace message destination (either a Topic or a Queue). */
   @AutoValue
+  @DefaultSchema(AutoValueSchema.class)
   public abstract static class Destination {
     /**
      * Gets the name of the destination.
@@ -105,6 +108,7 @@ public class Solace {
 
   /** Represents a Solace message record with its associated metadata. */
   @AutoValue
+  @DefaultSchema(AutoValueSchema.class)
   public abstract static class Record {
     /**
      * Gets the unique identifier of the message, a string for an 
application-specific message
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
similarity index 51%
copy from 
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
copy to 
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index 9b4ef99eba7..285c1cb8a7e 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -15,19 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.io.solace.broker;
+package org.apache.beam.sdk.io.solace;
 
-import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
 
-/**
- * This abstract class serves as a blueprint for creating `SessionService` 
objects. It introduces a
- * queue property and mandates the implementation of a create() method in 
concrete subclasses.
- */
-public abstract class SessionServiceFactory implements Serializable {
+public class MockEmptySessionService implements SessionService {
+
+  String exceptionMessage = "This is an empty client, use a MockSessionService 
instead.";
+
+  @Override
+  public void close() {
+    throw new UnsupportedOperationException(exceptionMessage);
+  }
+
+  @Override
+  public boolean isClosed() {
+    throw new UnsupportedOperationException(exceptionMessage);
+  }
+
+  @Override
+  public MessageReceiver createReceiver() {
+    throw new UnsupportedOperationException(exceptionMessage);
+  }
 
-  /**
-   * This is the core method that subclasses must implement. It defines how to 
construct and return
-   * a SessionService object.
-   */
-  public abstract SessionService create();
+  @Override
+  public void connect() {
+    throw new UnsupportedOperationException(exceptionMessage);
+  }
 }
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java
new file mode 100644
index 00000000000..d4703237371
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.Queue;
+import java.io.IOException;
+import org.apache.beam.sdk.io.solace.broker.SempClient;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public class MockSempClient implements SempClient {
+
+  private final SerializableFunction<String, Boolean> isQueueNonExclusiveFn;
+  private final SerializableFunction<String, Long> getBacklogBytesFn;
+  private final SerializableFunction<String, Integer> createQueueForTopicFn;
+
+  private MockSempClient(
+      SerializableFunction<String, Boolean> isQueueNonExclusiveFn,
+      SerializableFunction<String, Long> getBacklogBytesFn,
+      SerializableFunction<String, Integer> createQueueForTopicFn) {
+    this.isQueueNonExclusiveFn = isQueueNonExclusiveFn;
+    this.getBacklogBytesFn = getBacklogBytesFn;
+    this.createQueueForTopicFn = createQueueForTopicFn;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private SerializableFunction<String, Boolean> isQueueNonExclusiveFn = 
(queueName) -> true;
+    private SerializableFunction<String, Long> getBacklogBytesFn = (queueName) 
-> 0L;
+    private SerializableFunction<String, Integer> createQueueForTopicFn = 
(queueName) -> 0;
+
+    public Builder setIsQueueNonExclusiveFn(
+        SerializableFunction<String, Boolean> isQueueNonExclusiveFn) {
+      this.isQueueNonExclusiveFn = isQueueNonExclusiveFn;
+      return this;
+    }
+
+    public Builder setGetBacklogBytesFn(SerializableFunction<String, Long> 
getBacklogBytesFn) {
+      this.getBacklogBytesFn = getBacklogBytesFn;
+      return this;
+    }
+
+    public Builder setCreateQueueForTopicFn(
+        SerializableFunction<String, Integer> createQueueForTopicFn) {
+      this.createQueueForTopicFn = createQueueForTopicFn;
+      return this;
+    }
+
+    public MockSempClient build() {
+      return new MockSempClient(isQueueNonExclusiveFn, getBacklogBytesFn, 
createQueueForTopicFn);
+    }
+  }
+
+  @Override
+  public boolean isQueueNonExclusive(String queueName) throws IOException {
+    return isQueueNonExclusiveFn.apply(queueName);
+  }
+
+  @Override
+  public Queue createQueueForTopic(String queueName, String topicName) throws 
IOException {
+    createQueueForTopicFn.apply(queueName);
+    return JCSMPFactory.onlyInstance().createQueue(queueName);
+  }
+
+  @Override
+  public long getBacklogBytes(String queueName) throws IOException {
+    return getBacklogBytesFn.apply(queueName);
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java
similarity index 59%
copy from 
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
copy to 
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java
index 9b4ef99eba7..3e64c3d9bfe 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java
@@ -15,19 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.io.solace.broker;
+package org.apache.beam.sdk.io.solace;
 
-import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.broker.SempClient;
+import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
 
-/**
- * This abstract class serves as a blueprint for creating `SessionService` 
objects. It introduces a
- * queue property and mandates the implementation of a create() method in 
concrete subclasses.
- */
-public abstract class SessionServiceFactory implements Serializable {
+public class MockSempClientFactory implements SempClientFactory {
+  SempClient sempClient;
+
+  public MockSempClientFactory(SempClient sempClient) {
+    this.sempClient = sempClient;
+  }
+
+  public static SempClientFactory getDefaultMock() {
+    return new MockSempClientFactory(MockSempClient.builder().build());
+  }
 
-  /**
-   * This is the core method that subclasses must implement. It defines how to 
construct and return
-   * a SessionService object.
-   */
-  public abstract SessionService create();
+  @Override
+  public SempClient create() {
+    return sempClient;
+  }
 }
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
new file mode 100644
index 00000000000..7b14da138c6
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public class MockSessionService implements SessionService {
+
+  private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
+  private MessageReceiver messageReceiver = null;
+  private final int minMessagesReceived;
+
+  public MockSessionService(
+      SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int 
minMessagesReceived) {
+    this.getRecordFn = getRecordFn;
+    this.minMessagesReceived = minMessagesReceived;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isClosed() {
+    return false;
+  }
+
+  @Override
+  public MessageReceiver createReceiver() {
+    if (messageReceiver == null) {
+      messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived);
+    }
+    return messageReceiver;
+  }
+
+  @Override
+  public void connect() {}
+
+  public static class MockReceiver implements MessageReceiver, Serializable {
+    private final AtomicInteger counter = new AtomicInteger();
+    private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
+    private final int minMessagesReceived;
+
+    public MockReceiver(
+        SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int 
minMessagesReceived) {
+      this.getRecordFn = getRecordFn;
+      this.minMessagesReceived = minMessagesReceived;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public boolean isClosed() {
+      return false;
+    }
+
+    @Override
+    public BytesXMLMessage receive() throws IOException {
+      return getRecordFn.apply(counter.getAndIncrement());
+    }
+
+    @Override
+    public boolean isEOF() {
+      return counter.get() >= minMessagesReceived;
+    }
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
similarity index 57%
copy from 
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
copy to 
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
index 9b4ef99eba7..603a30ad2c9 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
@@ -15,19 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.io.solace.broker;
+package org.apache.beam.sdk.io.solace;
 
-import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
 
-/**
- * This abstract class serves as a blueprint for creating `SessionService` 
objects. It introduces a
- * queue property and mandates the implementation of a create() method in 
concrete subclasses.
- */
-public abstract class SessionServiceFactory implements Serializable {
+public class MockSessionServiceFactory extends SessionServiceFactory {
+  SessionService sessionService;
+
+  public MockSessionServiceFactory(SessionService clientService) {
+    this.sessionService = clientService;
+  }
+
+  public static SessionServiceFactory getDefaultMock() {
+    return new MockSessionServiceFactory(new MockEmptySessionService());
+  }
 
-  /**
-   * This is the core method that subclasses must implement. It defines how to 
construct and return
-   * a SessionService object.
-   */
-  public abstract SessionService create();
+  @Override
+  public SessionService create() {
+    return sessionService;
+  }
 }
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
new file mode 100644
index 00000000000..bd9d5d401b5
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.Queue;
+import com.solacesystems.jcsmp.Topic;
+import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.solace.SolaceIO.Read;
+import org.apache.beam.sdk.io.solace.SolaceIO.Read.Configuration;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.Record;
+import org.apache.beam.sdk.io.solace.data.SolaceDataUtils;
+import org.apache.beam.sdk.io.solace.data.SolaceDataUtils.SimpleRecord;
+import org.apache.beam.sdk.io.solace.read.SolaceCheckpointMark;
+import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SolaceIOTest {
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  private Read<Record> getDefaultRead() {
+    return SolaceIO.read()
+        .from(Solace.Queue.fromName("queue"))
+        .withSempClientFactory(MockSempClientFactory.getDefaultMock())
+        .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock())
+        .withMaxNumConnections(1);
+  }
+
+  private Read<Record> getDefaultReadForTopic() {
+    return SolaceIO.read()
+        .from(Solace.Topic.fromName("topic"))
+        .withSempClientFactory(MockSempClientFactory.getDefaultMock())
+        .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock())
+        .withMaxNumConnections(1);
+  }
+
+  private static BytesXMLMessage getOrNull(Integer index, 
List<BytesXMLMessage> messages) {
+    return index != null && index < messages.size() ? messages.get(index) : 
null;
+  }
+
+  private static UnboundedSolaceSource<Record> getSource(Read<Record> spec, 
TestPipeline pipeline) {
+    Configuration<Record> configuration = spec.configurationBuilder.build();
+    return new UnboundedSolaceSource<>(
+        configuration.getQueue(),
+        configuration.getSempClientFactory(),
+        configuration.getSessionServiceFactory(),
+        configuration.getMaxNumConnections(),
+        configuration.getDeduplicateRecords(),
+        spec.inferCoder(pipeline, configuration.getTypeDescriptor()),
+        configuration.getTimestampFn(),
+        configuration.getParseFn());
+  }
+
+  @Test
+  public void testReadMessages() {
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages =
+                  ImmutableList.of(
+                      SolaceDataUtils.getBytesXmlMessage("payload_test0", 
"450"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test1", 
"451"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test2", 
"452"));
+              return getOrNull(index, messages);
+            },
+            3);
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    // Expected data
+    List<Solace.Record> expected = new ArrayList<>();
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450"));
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451"));
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test2", "452"));
+
+    // Run the pipeline
+    PCollection<Solace.Record> events =
+        pipeline.apply(
+            "Read from Solace",
+            
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory));
+
+    // Assert results
+    PAssert.that(events).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadMessagesWithDeduplication() {
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages =
+                  ImmutableList.of(
+                      SolaceDataUtils.getBytesXmlMessage("payload_test0", 
"450"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test1", 
"451"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test2", 
"451"));
+              return getOrNull(index, messages);
+            },
+            3);
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    // Expected data
+    List<Solace.Record> expected = new ArrayList<>();
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450"));
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451"));
+
+    // Run the pipeline
+    PCollection<Solace.Record> events =
+        pipeline.apply(
+            "Read from Solace",
+            getDefaultRead()
+                .withSessionServiceFactory(fakeSessionServiceFactory)
+                .withDeduplicateRecords(true));
+    // Assert results
+    PAssert.that(events).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadMessagesWithoutDeduplication() {
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages =
+                  ImmutableList.of(
+                      SolaceDataUtils.getBytesXmlMessage("payload_test0", 
"450"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test1", 
"451"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test2", 
"451"));
+              return getOrNull(index, messages);
+            },
+            3);
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    // Expected data
+    List<Solace.Record> expected = new ArrayList<>();
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450"));
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451"));
+    expected.add(SolaceDataUtils.getSolaceRecord("payload_test2", "451"));
+
+    // Run the pipeline
+
+    PCollection<Solace.Record> events =
+        pipeline.apply(
+            "Read from Solace",
+            
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory));
+    // Assert results
+    PAssert.that(events).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() {
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages =
+                  ImmutableList.of(
+                      SolaceDataUtils.getBytesXmlMessage(
+                          "payload_test0", null, null, new 
ReplicationGroupMessageIdImpl(2L, 1L)),
+                      SolaceDataUtils.getBytesXmlMessage(
+                          "payload_test1", null, null, new 
ReplicationGroupMessageIdImpl(2L, 2L)),
+                      SolaceDataUtils.getBytesXmlMessage(
+                          "payload_test2", null, null, new 
ReplicationGroupMessageIdImpl(2L, 2L)));
+              return getOrNull(index, messages);
+            },
+            3);
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    // Expected data
+    List<Solace.Record> expected = new ArrayList<>();
+    expected.add(
+        SolaceDataUtils.getSolaceRecord(
+            "payload_test0", null, new ReplicationGroupMessageIdImpl(2L, 1L)));
+    expected.add(
+        SolaceDataUtils.getSolaceRecord(
+            "payload_test1", null, new ReplicationGroupMessageIdImpl(2L, 2L)));
+
+    // Run the pipeline
+    PCollection<Solace.Record> events =
+        pipeline.apply(
+            "Read from Solace",
+            getDefaultRead()
+                .withSessionServiceFactory(fakeSessionServiceFactory)
+                .withDeduplicateRecords(true));
+    // Assert results
+    PAssert.that(events).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadWithCoderAndParseFnAndTimestampFn() {
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages =
+                  ImmutableList.of(
+                      SolaceDataUtils.getBytesXmlMessage("payload_test0", 
"450"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test1", 
"451"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test2", 
"452"));
+              return getOrNull(index, messages);
+            },
+            3);
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    // Expected data
+    List<SimpleRecord> expected = new ArrayList<>();
+    expected.add(new SimpleRecord("payload_test0", "450"));
+    expected.add(new SimpleRecord("payload_test1", "451"));
+    expected.add(new SimpleRecord("payload_test2", "452"));
+
+    // Run the pipeline
+    PCollection<SimpleRecord> events =
+        pipeline.apply(
+            "Read from Solace",
+            SolaceIO.read(
+                    TypeDescriptor.of(SimpleRecord.class),
+                    input ->
+                        new SimpleRecord(
+                            new String(input.getBytes(), 
StandardCharsets.UTF_8),
+                            input.getApplicationMessageId()),
+                    input -> Instant.ofEpochMilli(1708100477061L))
+                .from(Solace.Queue.fromName("queue"))
+                .withSempClientFactory(MockSempClientFactory.getDefaultMock())
+                .withSessionServiceFactory(fakeSessionServiceFactory)
+                .withMaxNumConnections(1));
+
+    // Assert results
+    PAssert.that(events).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  public void testNoQueueAndTopicSet() {
+    Read<Record> spec = SolaceIO.read();
+    assertThrows(IllegalStateException.class, () -> 
spec.validate(pipeline.getOptions()));
+  }
+
+  @Test
+  public void testSplitsForExclusiveQueue() throws Exception {
+    MockSempClient mockSempClient =
+        MockSempClient.builder().setIsQueueNonExclusiveFn((q) -> 
false).build();
+
+    Read<Record> spec =
+        SolaceIO.read()
+            .from(Solace.Queue.fromName("queue"))
+            .withSempClientFactory(new MockSempClientFactory(mockSempClient))
+            
.withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock());
+
+    int desiredNumSplits = 5;
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+    List<UnboundedSolaceSource<Record>> splits =
+        initialSource.split(desiredNumSplits, PipelineOptionsFactory.create());
+    assertEquals(1, splits.size());
+  }
+
+  @Test
+  public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws 
Exception {
+    Read<Record> spec = getDefaultRead().withMaxNumConnections(3);
+
+    int desiredNumSplits = 5;
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+    List<UnboundedSolaceSource<Record>> splits =
+        initialSource.split(desiredNumSplits, PipelineOptionsFactory.create());
+    assertEquals(3, splits.size());
+  }
+
+  @Test
+  public void 
testSplitsForNonExclusiveQueueWithMaxNumConnectionsRespectDesired() throws 
Exception {
+    Read<Record> spec = getDefaultRead().withMaxNumConnections(10);
+    int desiredNumSplits = 5;
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+    List<UnboundedSolaceSource<Record>> splits =
+        initialSource.split(desiredNumSplits, PipelineOptionsFactory.create());
+    assertEquals(5, splits.size());
+  }
+
+  @Test
+  public void testCreateQueueForTopic() {
+    AtomicInteger createQueueForTopicFnCounter = new AtomicInteger(0);
+    MockSempClient mockSempClient =
+        MockSempClient.builder()
+            .setCreateQueueForTopicFn((q) -> 
createQueueForTopicFnCounter.incrementAndGet())
+            .build();
+
+    Read<Record> spec =
+        getDefaultReadForTopic().withSempClientFactory(new 
MockSempClientFactory(mockSempClient));
+    spec.expand(PBegin.in(TestPipeline.create()));
+    // check if createQueueForTopic was executed
+    assertEquals(1, createQueueForTopicFnCounter.get());
+  }
+
+  @Test
+  public void testCheckpointMark() throws Exception {
+    AtomicInteger countConsumedMessages = new AtomicInteger(0);
+    AtomicInteger countAckMessages = new AtomicInteger(0);
+
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages = new ArrayList<>();
+              for (int i = 0; i < 10; i++) {
+                messages.add(
+                    SolaceDataUtils.getBytesXmlMessage(
+                        "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
+              }
+              countConsumedMessages.incrementAndGet();
+              return getOrNull(index, messages);
+            },
+            10);
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+    Read<Record> spec = 
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory);
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+    UnboundedReader<Record> reader =
+        initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: start already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
+    }
+
+    // check if 4 messages were consumed
+    assertEquals(4, countConsumedMessages.get());
+
+    // check if no messages were acknowledged yet
+    assertEquals(0, countAckMessages.get());
+
+    // finalize the checkpoint
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    // check if messages were acknowledged
+    assertEquals(4, countAckMessages.get());
+  }
+
+  @Test
+  public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
+    AtomicInteger countConsumedMessages = new AtomicInteger(0);
+    AtomicInteger countAckMessages = new AtomicInteger(0);
+
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages = new ArrayList<>();
+              for (int i = 0; i < 10; i++) {
+                messages.add(
+                    SolaceDataUtils.getBytesXmlMessage(
+                        "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
+              }
+              countConsumedMessages.incrementAndGet();
+              return getOrNull(index, messages);
+            },
+            10);
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    Read<Record> spec =
+        getDefaultRead()
+            .withSessionServiceFactory(fakeSessionServiceFactory)
+            .withMaxNumConnections(4);
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+    UnboundedReader<Record> reader =
+        initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: start already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
+    }
+
+    // create checkpoint but don't finalize yet
+    CheckpointMark checkpointMark = reader.getCheckpointMark();
+
+    // consume 2 more messages
+    reader.advance();
+    reader.advance();
+
+    // check if messages are still not acknowledged
+    assertEquals(0, countAckMessages.get());
+
+    // acknowledge from the first checkpoint
+    checkpointMark.finalizeCheckpoint();
+
+    // only messages from the first checkpoint are acknowledged
+    assertEquals(4, countAckMessages.get());
+  }
+
+  @Test
+  public void testCheckpointMarkSafety() throws Exception {
+
+    final int messagesToProcess = 100;
+
+    AtomicInteger countConsumedMessages = new AtomicInteger(0);
+    AtomicInteger countAckMessages = new AtomicInteger(0);
+
+    // Broker that creates input data
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages = new ArrayList<>();
+              for (int i = 0; i < messagesToProcess; i++) {
+                messages.add(
+                    SolaceDataUtils.getBytesXmlMessage(
+                        "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
+              }
+              countConsumedMessages.incrementAndGet();
+              return getOrNull(index, messages);
+            },
+            10);
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+    Read<Record> spec =
+        getDefaultRead()
+            .withSessionServiceFactory(fakeSessionServiceFactory)
+            .withMaxNumConnections(4);
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+    UnboundedReader<Record> reader =
+        initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume half the messages (NB: start already consumed the first message)
+    for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // the messages are still pending in the queue (no ACK yet)
+    assertEquals(0, countAckMessages.get());
+
+    // we finalize the checkpoint for the already-processed messages while 
simultaneously
+    // consuming the remainder of messages from the queue
+    Thread runner =
+        new Thread(
+            () -> {
+              try {
+                for (int i = 0; i < messagesToProcess / 2; i++) {
+                  assertTrue(reader.advance());
+                }
+              } catch (IOException ex) {
+                throw new RuntimeException(ex);
+              }
+            });
+    runner.start();
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    // Concurrency issues would cause an exception to be thrown before this 
method exits,
+    // failing the test
+    runner.join();
+  }
+
+  @Test
+  public void testDefaultCoder() {
+    Coder<SolaceCheckpointMark> coder =
+        new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, 
null)
+            .getCheckpointMarkCoder();
+    CoderProperties.coderSerializable(coder);
+  }
+
+  @Test
+  public void testDestinationTopicQueueCreation() {
+    String topicName = "some-topic";
+    String queueName = "some-queue";
+    Topic topic = SolaceIO.topicFromName(topicName);
+    Queue queue = SolaceIO.queueFromName(queueName);
+
+    Destination dest = topic;
+    assertTrue(dest instanceof Topic);
+    assertFalse(dest instanceof Queue);
+    assertEquals(topicName, dest.getName());
+
+    dest = queue;
+    assertTrue(dest instanceof Queue);
+    assertFalse(dest instanceof Topic);
+    assertEquals(queueName, dest.getName());
+
+    Record r = SolaceDataUtils.getSolaceRecord("payload_test0", "450");
+    dest = SolaceIO.convertToJcsmpDestination(r.getDestination());
+    assertTrue(dest instanceof Topic);
+    assertFalse(dest instanceof Queue);
+  }
+
+  @Test
+  public void testTopicEncoding() {
+    MockSessionService mockClientService =
+        new MockSessionService(
+            index -> {
+              List<BytesXMLMessage> messages =
+                  ImmutableList.of(
+                      SolaceDataUtils.getBytesXmlMessage("payload_test0", 
"450"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test1", 
"451"),
+                      SolaceDataUtils.getBytesXmlMessage("payload_test2", 
"452"));
+              return getOrNull(index, messages);
+            },
+            3);
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        new MockSessionServiceFactory(mockClientService);
+
+    // Run
+    PCollection<Solace.Record> events =
+        pipeline.apply(
+            "Read from Solace",
+            
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory));
+
+    // Run the pipeline
+    PCollection<Boolean> destAreTopics =
+        events.apply(
+            MapElements.into(TypeDescriptors.booleans())
+                .via(
+                    r -> {
+                      Destination dest = 
SolaceIO.convertToJcsmpDestination(r.getDestination());
+                      return dest instanceof Topic;
+                    }));
+
+    List<Boolean> expected = ImmutableList.of(true, true, true);
+
+    // Assert results
+    PAssert.that(destAreTopics).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+}
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
new file mode 100644
index 00000000000..2e953150c6d
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.data;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.MessageType;
+import com.solacesystems.jcsmp.ReplicationGroupMessageId;
+import com.solacesystems.jcsmp.SDTMap;
+import com.solacesystems.jcsmp.User_Cos;
+import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SolaceDataUtils {
+  public static final ReplicationGroupMessageId DEFAULT_REPLICATION_GROUP_ID =
+      new ReplicationGroupMessageIdImpl(1L, 136L);
+
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class SimpleRecord {
+    public String payload;
+    public String messageId;
+
+    public SimpleRecord() {}
+
+    public SimpleRecord(String payload, String messageId) {
+      this.payload = payload;
+      this.messageId = messageId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof SimpleRecord)) {
+        return false;
+      }
+      SimpleRecord that = (SimpleRecord) o;
+      return Objects.equals(payload, that.payload) && 
Objects.equals(messageId, that.messageId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(payload, messageId);
+    }
+
+    @Override
+    public String toString() {
+      return "SimpleRecord{"
+          + "payload='"
+          + payload
+          + '\''
+          + ", messageId='"
+          + messageId
+          + '\''
+          + '}';
+    }
+  }
+
+  public static Solace.Record getSolaceRecord(String payload, String 
messageId) {
+    return getSolaceRecord(payload, messageId, null);
+  }
+
+  public static Solace.Record getSolaceRecord(
+      String payload,
+      String messageId,
+      @Nullable ReplicationGroupMessageId replicationGroupMessageId) {
+    String replicationGroupMessageIdString =
+        replicationGroupMessageId != null
+            ? replicationGroupMessageId.toString()
+            : DEFAULT_REPLICATION_GROUP_ID.toString();
+
+    return Solace.Record.builder()
+        .setPayload(ByteString.copyFrom(payload, StandardCharsets.UTF_8))
+        .setMessageId(messageId)
+        .setDestination(
+            Solace.Destination.builder()
+                .setName("destination-topic")
+                .setType(Solace.DestinationType.TOPIC)
+                .build())
+        .setExpiration(1000L)
+        .setPriority(0)
+        .setReceiveTimestamp(1708100477067L)
+        .setRedelivered(false)
+        .setReplyTo(null)
+        .setSequenceNumber(null)
+        .setTimeToLive(1000L)
+        .setSenderTimestamp(null)
+        .setReplicationGroupMessageId(replicationGroupMessageIdString)
+        .setAttachmentBytes(ByteString.EMPTY)
+        .build();
+  }
+
+  public static BytesXMLMessage getBytesXmlMessage(String payload, String 
messageId) {
+    return getBytesXmlMessage(payload, messageId, null, null);
+  }
+
+  public static BytesXMLMessage getBytesXmlMessage(
+      String payload, String messageId, SerializableFunction<Integer, Integer> 
ackMessageFn) {
+    return getBytesXmlMessage(payload, messageId, ackMessageFn, null);
+  }
+
+  public static BytesXMLMessage getBytesXmlMessage(
+      String payload,
+      String messageId,
+      SerializableFunction<Integer, Integer> ackMessageFn,
+      ReplicationGroupMessageId replicationGroupMessageId) {
+    long receiverTimestamp = 1708100477067L;
+    long expiration = 1000L;
+    long timeToLive = 1000L;
+    String destination = "destination-topic";
+
+    ReplicationGroupMessageId useReplicationGroupId =
+        replicationGroupMessageId != null
+            ? replicationGroupMessageId
+            : DEFAULT_REPLICATION_GROUP_ID;
+    return new BytesXMLMessage() {
+
+      @Override
+      public byte[] getBytes() {
+        return payload.getBytes(StandardCharsets.UTF_8);
+      }
+
+      @Override
+      public int getContentLength() {
+        return payload.getBytes(StandardCharsets.UTF_8).length;
+      }
+
+      @Override
+      public int readBytes(byte[] arg0) {
+        return 0;
+      }
+
+      @Override
+      public int readBytes(byte[] arg0, int arg1) {
+        return 0;
+      }
+
+      @Override
+      public void rewindContent() {}
+
+      @Override
+      public void writeBytes(byte[] arg0) {}
+
+      @Override
+      public void writeBytes(byte[] arg0, int arg1, int arg2) {}
+
+      @Override
+      public void ackMessage() {
+        if (ackMessageFn != null) {
+          ackMessageFn.apply(0);
+        }
+      }
+
+      @Override
+      public void clearAttachment() {}
+
+      @Override
+      public void clearBinaryMetadataBytes(int arg0) {}
+
+      @Override
+      public void clearContent() {}
+
+      @Override
+      public void clearQueueNameLocation() {}
+
+      @Override
+      public void clearTopicNameLocation() {}
+
+      @Override
+      public String dump() {
+        return null;
+      }
+
+      @Override
+      public String dump(int arg0) {
+        return null;
+      }
+
+      @Override
+      public long getAckMessageId() {
+        return 0;
+      }
+
+      @Override
+      public String getAppMessageID() {
+        return null;
+      }
+
+      @Override
+      public String getAppMessageType() {
+        return null;
+      }
+
+      @Override
+      public String getApplicationMessageId() {
+        return messageId;
+      }
+
+      @Override
+      public String getApplicationMessageType() {
+        return null;
+      }
+
+      @Override
+      public ByteBuffer getAttachmentByteBuffer() {
+        return null;
+      }
+
+      @Override
+      public int getAttachmentContentLength() {
+        return 0;
+      }
+
+      @Override
+      public int getBinaryMetadataContentLength(int arg0) {
+        return 0;
+      }
+
+      @Override
+      public Collection<Integer> getBinaryMetadataTypes() {
+        return null;
+      }
+
+      @Override
+      public Long getCacheRequestId() {
+        return null;
+      }
+
+      @Override
+      public List<Long> getConsumerIdList() {
+        return null;
+      }
+
+      @Override
+      public String getCorrelationId() {
+        return null;
+      }
+
+      @Override
+      public Object getCorrelationKey() {
+        return null;
+      }
+
+      @Override
+      public User_Cos getCos() {
+        return null;
+      }
+
+      @Override
+      public boolean getDeliverToOne() {
+        return false;
+      }
+
+      @Override
+      public int getDeliveryCount() throws UnsupportedOperationException {
+        return 0;
+      }
+
+      @Override
+      public DeliveryMode getDeliveryMode() {
+        return null;
+      }
+
+      @Override
+      public Destination getDestination() {
+        return JCSMPFactory.onlyInstance().createTopic(destination);
+      }
+
+      @Override
+      public String getDestinationTopicSuffix() {
+        return null;
+      }
+
+      @Override
+      public boolean getDiscardIndication() {
+        return false;
+      }
+
+      @Override
+      public long getExpiration() {
+        return expiration;
+      }
+
+      @Override
+      public String getHTTPContentEncoding() {
+        return null;
+      }
+
+      @Override
+      public String getHTTPContentType() {
+        return null;
+      }
+
+      @Override
+      public String getMessageId() {
+        return null;
+      }
+
+      @Override
+      public long getMessageIdLong() {
+        return 0;
+      }
+
+      @Override
+      public MessageType getMessageType() {
+        return null;
+      }
+
+      @Override
+      public int getPriority() {
+        return 0;
+      }
+
+      @Override
+      public SDTMap getProperties() {
+        return null;
+      }
+
+      @Override
+      public int getQueueNameLength() {
+        return 0;
+      }
+
+      @Override
+      public int getQueueNameOffset() {
+        return 0;
+      }
+
+      @Override
+      public long getReceiveTimestamp() {
+        return receiverTimestamp;
+      }
+
+      @Override
+      public boolean getRedelivered() {
+        return false;
+      }
+
+      @Override
+      public ReplicationGroupMessageId getReplicationGroupMessageId() {
+        // this is always set by Solace
+        return useReplicationGroupId;
+      }
+
+      @Override
+      public Destination getReplyTo() {
+        return null;
+      }
+
+      @Override
+      public String getReplyToSuffix() {
+        return null;
+      }
+
+      @Override
+      public Long getSendTimestamp() {
+        return null;
+      }
+
+      @Override
+      public String getSenderID() {
+        return null;
+      }
+
+      @Override
+      public String getSenderId() {
+        return null;
+      }
+
+      @Override
+      public Long getSenderTimestamp() {
+        return null;
+      }
+
+      @Override
+      public Long getSequenceNumber() {
+        return null;
+      }
+
+      @Override
+      public byte getStructuredMsgType() {
+        return 0x2;
+      }
+
+      @Override
+      public boolean getTQDiscardIndication() {
+        return false;
+      }
+
+      @Override
+      public long getTimeToLive() {
+        return timeToLive;
+      }
+
+      @Override
+      public int getTopicNameLength() {
+        return 5;
+      }
+
+      @Override
+      public int getTopicNameOffset() {
+        return 0;
+      }
+
+      @Override
+      public Long getTopicSequenceNumber() {
+        return null;
+      }
+
+      @Override
+      public byte[] getUserData() {
+        return null;
+      }
+
+      @Override
+      public boolean hasAttachment() {
+        return false;
+      }
+
+      @Override
+      public boolean hasBinaryMetadata(int arg0) {
+        return false;
+      }
+
+      @Override
+      public boolean hasContent() {
+        return false;
+      }
+
+      @Override
+      public boolean hasUserData() {
+        return false;
+      }
+
+      @Override
+      public boolean isAckImmediately() {
+        return false;
+      }
+
+      @Override
+      public boolean isCacheMessage() {
+        return false;
+      }
+
+      @Override
+      public boolean isDMQEligible() {
+        return false;
+      }
+
+      @Override
+      public boolean isDeliveryCountSupported() {
+        return false;
+      }
+
+      @Override
+      public boolean isElidingEligible() {
+        return false;
+      }
+
+      @Override
+      public boolean isReadOnly() {
+        return false;
+      }
+
+      @Override
+      public boolean isReplyMessage() {
+        return false;
+      }
+
+      @Override
+      public boolean isStructuredMsg() {
+        return false;
+      }
+
+      @Override
+      public boolean isSuspect() {
+        return false;
+      }
+
+      @Override
+      public int readAttachmentBytes(byte[] arg0) {
+        return 0;
+      }
+
+      @Override
+      public int readAttachmentBytes(byte[] arg0, int arg1) {
+        return 0;
+      }
+
+      @Override
+      public int readAttachmentBytes(int arg0, byte[] arg1, int arg2, int 
arg3) {
+        return 0;
+      }
+
+      @Override
+      public int readBinaryMetadataBytes(int arg0, byte[] arg1) {
+        return 0;
+      }
+
+      @Override
+      public int readContentBytes(byte[] arg0) {
+        return 0;
+      }
+
+      @Override
+      public int readContentBytes(byte[] arg0, int arg1) {
+        return 0;
+      }
+
+      @Override
+      public int readContentBytes(int arg0, byte[] arg1, int arg2, int arg3) {
+        return 0;
+      }
+
+      @Override
+      public void rejectMessage() {}
+
+      @Override
+      public void reset() {}
+
+      @Override
+      public void resetPayload() {}
+
+      @Override
+      public void rewindAttachment() {}
+
+      @Override
+      public void setAckImmediately(boolean arg0) {}
+
+      @Override
+      public void setAppMessageID(String arg0) {}
+
+      @Override
+      public void setAppMessageType(String arg0) {}
+
+      @Override
+      public void setApplicationMessageId(String arg0) {}
+
+      @Override
+      public void setApplicationMessageType(String arg0) {}
+
+      @Override
+      public void setAsReplyMessage(boolean arg0) {}
+
+      @Override
+      public void setCorrelationId(String arg0) {}
+
+      @Override
+      public void setCorrelationKey(Object arg0) {}
+
+      @Override
+      public void setCos(User_Cos arg0) {}
+
+      @Override
+      public void setDMQEligible(boolean arg0) {}
+
+      @Override
+      public void setDeliverToOne(boolean arg0) {}
+
+      @Override
+      public void setDeliveryMode(DeliveryMode arg0) {}
+
+      @Override
+      public void setElidingEligible(boolean arg0) {}
+
+      @Override
+      public void setExpiration(long arg0) {}
+
+      @Override
+      public void setHTTPContentEncoding(String arg0) {}
+
+      @Override
+      public void setHTTPContentType(String arg0) {}
+
+      @Override
+      public void setMessageType(MessageType arg0) {}
+
+      @Override
+      public void setPriority(int arg0) {}
+
+      @Override
+      public void setProperties(SDTMap arg0) {}
+
+      @Override
+      public void setQueueNameLocation(int arg0, int arg1) {}
+
+      @Override
+      public void setReadOnly() {}
+
+      @Override
+      public void setReplyTo(Destination arg0) {}
+
+      @Override
+      public void setReplyToSuffix(String arg0) {}
+
+      @Override
+      public void setSendTimestamp(long arg0) {}
+
+      @Override
+      public void setSenderID(String arg0) {}
+
+      @Override
+      public void setSenderId(String arg0) {}
+
+      @Override
+      public void setSenderTimestamp(long arg0) {}
+
+      @Override
+      public void setSequenceNumber(long arg0) {}
+
+      @Override
+      public void setStructuredMsg(boolean arg0) {}
+
+      @Override
+      public void setStructuredMsgType(byte arg0) {}
+
+      @Override
+      public void setTimeToLive(long arg0) {}
+
+      @Override
+      public void setTopicNameLocation(int arg0, int arg1) {}
+
+      @Override
+      public void setUserData(byte[] arg0) {}
+
+      @Override
+      public void settle(Outcome arg0) throws JCSMPException {}
+
+      @Override
+      public int writeAttachment(byte[] arg0) {
+        return 0;
+      }
+
+      @Override
+      public int writeAttachment(InputStream arg0) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public int writeAttachment(byte[] arg0, int arg1, int arg2) throws 
BufferUnderflowException {
+        return 0;
+      }
+
+      @Override
+      public int writeBinaryMetadataBytes(int arg0, byte[] arg1) {
+        return 0;
+      }
+
+      @Override
+      public int writeBinaryMetadataBytes(int arg0, byte[] arg1, int arg2, int 
arg3)
+          throws BufferUnderflowException {
+        return 0;
+      }
+
+      @Override
+      public int writeNewAttachment(byte[] arg0) {
+        return 0;
+      }
+
+      @Override
+      public int writeNewAttachment(InputStream arg0) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public int writeNewAttachment(byte[] arg0, int arg1, int arg2)
+          throws BufferUnderflowException {
+        return 0;
+      }
+
+      @Override
+      public int writeNewAttachment(InputStream arg0, int arg1, int arg2) 
throws IOException {
+        return 0;
+      }
+    };
+  }
+}

Reply via email to