This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 18af8c837fa Solace Read connector (#31476)
18af8c837fa is described below
commit 18af8c837faf63aaee66f7071aef63835a9b9dc0
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Fri Jun 21 21:47:56 2024 +0200
Solace Read connector (#31476)
* wip solace connector
* wip solace connector
* some checker errors resolved
* all checker errors resolved
* improving unit tests
* respond to pr commments
* Documentation
* Small refactor - move data classes out of the client
* refactor
* Add github action for integration test of Solace
* testing github workflow
* bump testcontainers to 1.19.7 - soalce semp was updated with an admin
user access
* Use FlowHandle to acknowledge messages to make SolaceCheckpointMark's
fields serializable.
* Handle StaleSessionException error
* Add @Internal annotation to mark the SolaceIO API beta and subject to
change.
* Improve documentation
* Back to ack based on bytesxmlmessages. Deduplicate default to false.
* update changes.md with Solace read connector
* remove ack by id code
* remove todo comment
* Add licenses to package-info.java files
* Restructure documentation
* update aws test after upgrading testcontainers version.
* Disable publishing docs until the first pass on the master branch
* Remove files from this branch to split PR into smaller chunks
* refactor tests for readability
* revert upgrade of testcontainers - not needed in this PR chunk
* revert upgrade of testcontainers - not needed in this PR chunk
* spotless
* remove IT tests from this pr
* Tech Writer review
* Add a field to Solace.Record mapped from
BytesXMLMessage.getAttachmentByteBuffer()
* Add and fix some documentation
* Remove CheckpointMark's reference to the UnboundedSolaceReader -
unnecessary.
* Revert "Remove CheckpointMark's reference to the UnboundedSolaceReader -
unnecessary."
This reverts commit 2e1c10e0b4c0f124af779ee4f284fcc79ccc8fc9.
* Solace project init - github workflow file, gradle module
* Splitting the #31476 - Leaving only PTransform AutoValue configurations
* remove unnecessary dependencies
* remove info from CHANGES.md
* Add watermark-related code
* Remove excessive @Nullable annotations on Solace.Record class
* Remove entry from CHANGES.md
* Make Record builder package-private
* Refactor SolaceIO - the constructor of Read takes a configuration builder
as an argument
* Change payload and attachments type to immutable ByteString
* Downgrade Record builders access modifiers to package private
* Add documentation
* Add documentation to classes and methods in Solace.java
* typo
* Add SolaceCheckpointMark.java
* Make SolaceCheckpointMark visible for testing
* Remove SolaceRecordCoder and take advantage of @DefaultSchema
---
sdks/java/io/solace/build.gradle | 8 +
.../org/apache/beam/sdk/io/solace/SolaceIO.java | 415 ++++++++++--
.../io/solace/broker/SessionServiceFactory.java | 19 +
.../org/apache/beam/sdk/io/solace/data/Solace.java | 4 +
.../sdk/io/solace/MockEmptySessionService.java} | 37 +-
.../apache/beam/sdk/io/solace/MockSempClient.java | 87 +++
.../beam/sdk/io/solace/MockSempClientFactory.java} | 29 +-
.../beam/sdk/io/solace/MockSessionService.java | 88 +++
.../sdk/io/solace/MockSessionServiceFactory.java} | 29 +-
.../apache/beam/sdk/io/solace/SolaceIOTest.java | 597 +++++++++++++++++
.../beam/sdk/io/solace/data/SolaceDataUtils.java | 708 +++++++++++++++++++++
11 files changed, 1945 insertions(+), 76 deletions(-)
diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index c317b566618..506145f3529 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -35,8 +35,16 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.solace
+ implementation library.java.vendored_guava_32_1_2_jre
implementation project(":sdks:java:extensions:avro")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.avro
permitUnusedDeclared library.java.avro
+ implementation library.java.vendored_grpc_1_60_1
+
+ testImplementation library.java.junit
+ testImplementation project(path: ":sdks:java:io:common", configuration:
"testRuntimeMigration")
+ testImplementation project(path: ":sdks:java:testing:test-utils",
configuration: "testRuntimeMigration")
+ testRuntimeOnly library.java.slf4j_jdk14
+ testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
index ca8cd615ac6..e6b0dd34b18 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
@@ -17,26 +17,196 @@
*/
package org.apache.beam.sdk.io.solace;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.Topic;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper;
+import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * A {@link PTransform} to read and write from/to <a
href="https://solace.com/">Solace</a> event
+ * broker.
+ *
+ * <p>Note: Internal use only; this API is beta and subject to change.
+ *
+ * <h2>Reading from Solace</h2>
+ *
+ * To read from Solace, use the {@link SolaceIO#read()} or {@link
SolaceIO#read(TypeDescriptor,
+ * SerializableFunction, SerializableFunction)}.
+ *
+ * <h3>No-argument {@link SolaceIO#read()} top-level method</h3>
+ *
+ * <p>This method returns a PCollection of {@link Solace.Record} objects. It
uses a default mapper
+ * ({@link SolaceRecordMapper#map(BytesXMLMessage)}) to map from the received
{@link
+ * BytesXMLMessage} from Solace, to the {@link Solace.Record} objects.
+ *
+ * <p>By default, it also uses a {@link BytesXMLMessage#getSenderTimestamp()}
for watermark
+ * estimation. This {@link SerializableFunction} can be overridden with {@link
+ * Read#withTimestampFn(SerializableFunction)} method.
+ *
+ * <p>When using this method, the Coders are inferred automatically.
+ *
+ * <h3>Advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction,
SerializableFunction)}
+ * top-level method</h3>
+ *
+ * <p>With this method, the user can:
+ *
+ * <ul>
+ * <li>specify a custom output type for the PTransform (for example their
own class consisting
+ * only of the relevant fields, optimized for their use-case), and
+ * <li>create a custom mapping between {@link BytesXMLMessage} and their
output type and
+ * <li>specify what field to use for watermark estimation from their mapped
field (for example, in
+ * this method the user can use a field which is encoded in the payload
as a timestamp, which
+ * cannot be done with the {@link SolaceIO#read()} method.
+ * </ul>
+ *
+ * <h3>Reading from a queue ({@link Read#from(Solace.Queue)}} or a topic
({@link
+ * Read#from(Solace.Topic)})</h3>
+ *
+ * <p>Regardless of the top-level read method choice, the user can specify
whether to read from a
+ * Queue - {@link Read#from(Solace.Queue)}, or a Topic {@link
Read#from(Solace.Topic)}.
+ *
+ * <p>Note: when a user specifies to read from a Topic, the connector will
create a matching Queue
+ * and a Subscription. The user must ensure that the SEMP API is reachable
from the driver program
+ * and must provide credentials that have `write` permission to the <a
+ * href="https://docs.solace.com/Admin/SEMP/Using-SEMP.htm">SEMP Config
API</a>. The created Queue
+ * will be non-exclusive. The Queue will not be deleted when the pipeline is
terminated.
+ *
+ * <p>Note: If the user specifies to read from a Queue, <a
+ *
href="https://beam.apache.org/documentation/programming-guide/#overview">the
driver program</a>
+ * will execute a call to the SEMP API to check if the Queue is `exclusive` or
`non-exclusive`. The
+ * user must ensure that the SEMP API is reachable from the driver program and
provide credentials
+ * with `read` permission to the {@link
Read#withSempClientFactory(SempClientFactory)}.
+ *
+ * <h3>Usage example</h3>
+ *
+ * <h4>The no-arg {@link SolaceIO#read()} method</h4>
+ *
+ * <p>The minimal example - reading from an existing Queue, using the no-arg
{@link SolaceIO#read()}
+ * method, with all the default configuration options.
+ *
+ * <pre>{@code
+ * PCollection<Solace.Record> events =
+ * pipeline.apply(
+ * SolaceIO.read()
+ * .from(Queue.fromName("your-queue-name"))
+ * .withSempClientFactory(
+ * BasicAuthSempClientFactory.builder()
+ * .host("your-host-name-with-protocol") // e.g.
"http://12.34.56.78:8080"
+ * .username("semp-username")
+ * .password("semp-password")
+ * .vpnName("vpn-name")
+ * .build())
+ * .withSessionServiceFactory(
+ * BasicAuthJcsmpSessionServiceFactory.builder()
+ * .host("your-host-name")
+ * // e.g. "12.34.56.78", or "[fe80::1]", or
"12.34.56.78:4444"
+ * .username("username")
+ * .password("password")
+ * .vpnName("vpn-name")
+ * .build()));
+ * }</pre>
+ *
+ * <h4>The advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction,
+ * SerializableFunction)} method</h4>
+ *
+ * <p>When using this method you can specify a custom output PCollection type
and a custom timestamp
+ * function.
+ *
+ * <pre>{@code
+ * @DefaultSchema(JavaBeanSchema.class)
+ * public static class SimpleRecord {
+ * public String payload;
+ * public String messageId;
+ * public Instant timestamp;
+ *
+ * public SimpleRecord() {}
+ *
+ * public SimpleRecord(String payload, String messageId, Instant timestamp)
{
+ * this.payload = payload;
+ * this.messageId = messageId;
+ * this.timestamp = timestamp;
+ * }
+ * }
+ *
+ * private static SimpleRecord toSimpleRecord(BytesXMLMessage record) {
+ * if (record == null) {
+ * return null;
+ * }
+ * return new SimpleRecord(
+ * new String(record.getBytes(), StandardCharsets.UTF_8),
+ * record.getApplicationMessageId(),
+ * record.getSenderTimestamp() != null
+ * ? Instant.ofEpochMilli(record.getSenderTimestamp())
+ * : Instant.now());
+ * }
+ *
+ * PCollection<SimpleRecord> events =
+ * pipeline.apply(
+ * SolaceIO.read(
+ * TypeDescriptor.of(SimpleRecord.class),
+ * record -> toSimpleRecord(record),
+ * record -> record.timestamp)
+ * .from(Topic.fromName("your-topic-name"))
+ * .withSempClientFactory(...)
+ * .withSessionServiceFactory(...);
+ *
+ *
+ * }</pre>
+ *
+ * <h3>Authentication</h3>
+ *
+ * <p>When reading from Solace, the user must use {@link
+ * Read#withSessionServiceFactory(SessionServiceFactory)} to create a JCSMP
session and {@link
+ * Read#withSempClientFactory(SempClientFactory)} to authenticate to the SEMP
API.
+ *
+ * <p>See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for
session authentication.
+ * The connector provides implementation of the {@link SessionServiceFactory}
using the Basic
+ * Authentication: {@link
org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}.
+ *
+ * <p>For the authentication to the SEMP API ({@link
Read#withSempClientFactory(SempClientFactory)})
+ * the connector provides {@link
org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
+ * authenticate using the Basic Authentication.
+ */
+@Internal
public class SolaceIO {
+ public static final SerializableFunction<Solace.Record, Instant>
SENDER_TIMESTAMP_FUNCTION =
+ (record) -> {
+ Long senderTimestamp = record != null ? record.getSenderTimestamp() :
null;
+ if (senderTimestamp != null) {
+ return Instant.ofEpochMilli(senderTimestamp);
+ } else {
+ return Instant.now();
+ }
+ };
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
/** Get a {@link Topic} object from the topic name. */
@@ -49,17 +219,84 @@ public class SolaceIO {
return JCSMPFactory.onlyInstance().createQueue(queueName);
}
- @AutoValue
- public abstract static class Read<T> extends PTransform<PBegin,
PCollection<T>> {
+ /**
+ * Convert to a JCSMP destination from a schema-enabled {@link
+ * org.apache.beam.sdk.io.solace.data.Solace.Destination}.
+ *
+ * <p>This method returns a {@link Destination}, which may be either a
{@link Topic} or a {@link
+ * Queue}
+ */
+ public static Destination convertToJcsmpDestination(Solace.Destination
destination) {
+ if (destination.getType().equals(Solace.DestinationType.TOPIC)) {
+ return topicFromName(checkNotNull(destination.getName()));
+ } else if (destination.getType().equals(Solace.DestinationType.QUEUE)) {
+ return queueFromName(checkNotNull(destination.getName()));
+ } else {
+ throw new IllegalArgumentException(
+ "SolaceIO.Write: Unknown destination type: " +
destination.getType());
+ }
+ }
+
+ /**
+ * Create a {@link Read} transform, to read from Solace. The ingested
records will be mapped to
+ * the {@link Solace.Record} objects.
+ */
+ public static Read<Solace.Record> read() {
+ return new Read<Solace.Record>(
+ Read.Configuration.<Solace.Record>builder()
+ .setTypeDescriptor(TypeDescriptor.of(Solace.Record.class))
+ .setParseFn(SolaceRecordMapper::map)
+ .setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
+ .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS));
+ }
+ /**
+ * Create a {@link Read} transform, to read from Solace. Specify a {@link
SerializableFunction} to
+ * map incoming {@link BytesXMLMessage} records, to the object of your
choice. You also need to
+ * specify a {@link TypeDescriptor} for your class and the timestamp
function which returns an
+ * {@link Instant} from the record.
+ *
+ * <p>The type descriptor will be used to infer a coder from CoderRegistry
or Schema Registry. You
+ * can initialize a new TypeDescriptor in the following manner:
+ *
+ * <pre>{@code
+ * TypeDescriptor<T> typeDescriptor =
TypeDescriptor.of(YourOutputType.class);
+ * }</pre>
+ */
+ public static <T> Read<T> read(
+ TypeDescriptor<T> typeDescriptor,
+ SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn,
+ SerializableFunction<T, Instant> timestampFn) {
+ checkState(typeDescriptor != null, "SolaceIO.Read: typeDescriptor must not
be null");
+ checkState(parseFn != null, "SolaceIO.Read: parseFn must not be null");
+ checkState(timestampFn != null, "SolaceIO.Read: timestampFn must not be
null");
+ return new Read<T>(
+ Read.Configuration.<T>builder()
+ .setTypeDescriptor(typeDescriptor)
+ .setParseFn(parseFn)
+ .setTimestampFn(timestampFn)
+ .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS));
+ }
+
+ public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+ @VisibleForTesting final Configuration.Builder<T> configurationBuilder;
+
+ public Read(Configuration.Builder<T> configurationBuilder) {
+ this.configurationBuilder = configurationBuilder;
+ }
/** Set the queue name to read from. Use this or the `from(Topic)` method.
*/
public Read<T> from(Solace.Queue queue) {
- return toBuilder().setQueue(queueFromName(queue.getName())).build();
+ configurationBuilder.setQueue(queueFromName(queue.getName()));
+ return this;
}
/** Set the topic name to read from. Use this or the `from(Queue)` method.
*/
public Read<T> from(Solace.Topic topic) {
- return toBuilder().setTopic(topicFromName(topic.getName())).build();
+ configurationBuilder.setTopic(topicFromName(topic.getName()));
+ return this;
}
/**
@@ -74,9 +311,10 @@ public class SolaceIO {
public Read<T> withTimestampFn(SerializableFunction<T, Instant>
timestampFn) {
checkState(
timestampFn != null,
- "SolaceIO.Read: timestamp function must be set or use the"
- + " `Read.readSolaceRecords()` method");
- return toBuilder().setTimestampFn(timestampFn).build();
+ "SolaceIO.Read: timestampFn must not be null. This function must be
set or "
+ + "use the no-argument `Read.read()` method");
+ configurationBuilder.setTimestampFn(timestampFn);
+ return this;
}
/**
@@ -87,7 +325,8 @@ public class SolaceIO {
* `desiredNumberOfSplits` is set by the runner.
*/
public Read<T> withMaxNumConnections(Integer maxNumConnections) {
- return toBuilder().setMaxNumConnections(maxNumConnections).build();
+ configurationBuilder.setMaxNumConnections(maxNumConnections);
+ return this;
}
/**
@@ -97,7 +336,8 @@ public class SolaceIO {
* which is always set by Solace.
*/
public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
- return toBuilder().setDeduplicateRecords(deduplicateRecords).build();
+ configurationBuilder.setDeduplicateRecords(deduplicateRecords);
+ return this;
}
/**
@@ -134,7 +374,8 @@ public class SolaceIO {
*/
public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory
must not be null.");
- return toBuilder().setSempClientFactory(sempClientFactory).build();
+ configurationBuilder.setSempClientFactory(sempClientFactory);
+ return this;
}
/**
@@ -175,63 +416,157 @@ public class SolaceIO {
public Read<T> withSessionServiceFactory(SessionServiceFactory
sessionServiceFactory) {
checkState(
sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory
must not be null.");
- return
toBuilder().setSessionServiceFactory(sessionServiceFactory).build();
+ configurationBuilder.setSessionServiceFactory(sessionServiceFactory);
+ return this;
}
- abstract @Nullable Queue getQueue();
+ @AutoValue
+ abstract static class Configuration<T> {
- abstract @Nullable Topic getTopic();
+ abstract @Nullable Queue getQueue();
- abstract @Nullable SerializableFunction<T, Instant> getTimestampFn();
+ abstract @Nullable Topic getTopic();
- abstract @Nullable Integer getMaxNumConnections();
+ abstract SerializableFunction<T, Instant> getTimestampFn();
- abstract boolean getDeduplicateRecords();
+ abstract @Nullable Integer getMaxNumConnections();
- abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T>
getParseFn();
+ abstract boolean getDeduplicateRecords();
- abstract @Nullable SempClientFactory getSempClientFactory();
+ abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T>
getParseFn();
- abstract @Nullable SessionServiceFactory getSessionServiceFactory();
+ abstract SempClientFactory getSempClientFactory();
- abstract TypeDescriptor<T> getTypeDescriptor();
+ abstract SessionServiceFactory getSessionServiceFactory();
- public static <T> Builder<T> builder() {
- Builder<T> builder = new
org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder<T>();
- builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS);
- return builder;
- }
+ abstract TypeDescriptor<T> getTypeDescriptor();
- abstract Builder<T> toBuilder();
+ public static <T> Builder<T> builder() {
+ Builder<T> builder =
+ new
org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder<T>();
+ return builder;
+ }
- @AutoValue.Builder
- public abstract static class Builder<T> {
+ @AutoValue.Builder
+ public abstract static class Builder<T> {
- abstract Builder<T> setQueue(Queue queue);
+ abstract Builder<T> setQueue(Queue queue);
- abstract Builder<T> setTopic(Topic topic);
+ abstract Builder<T> setTopic(Topic topic);
- abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant>
timestampFn);
+ abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant>
timestampFn);
- abstract Builder<T> setMaxNumConnections(Integer maxNumConnections);
+ abstract Builder<T> setMaxNumConnections(Integer maxNumConnections);
- abstract Builder<T> setDeduplicateRecords(boolean deduplicateRecords);
+ abstract Builder<T> setDeduplicateRecords(boolean deduplicateRecords);
- abstract Builder<T> setParseFn(
- SerializableFunction<@Nullable BytesXMLMessage, @Nullable T>
parseFn);
+ abstract Builder<T> setParseFn(
+ SerializableFunction<@Nullable BytesXMLMessage, @Nullable T>
parseFn);
- abstract Builder<T> setSempClientFactory(SempClientFactory
brokerServiceFactory);
+ abstract Builder<T> setSempClientFactory(SempClientFactory
brokerServiceFactory);
- abstract Builder<T> setSessionServiceFactory(SessionServiceFactory
sessionServiceFactory);
+ abstract Builder<T> setSessionServiceFactory(SessionServiceFactory
sessionServiceFactory);
- abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor);
+ abstract Builder<T> setTypeDescriptor(TypeDescriptor<T>
typeDescriptor);
- abstract Read<T> build();
+ abstract Configuration<T> build();
+ }
+ }
+
+ @Override
+ public void validate(@Nullable PipelineOptions options) {
+ Configuration<T> configuration = configurationBuilder.build();
+ checkState(
+ (configuration.getQueue() == null ^ configuration.getTopic() ==
null),
+ "SolaceIO.Read: One of the Solace {Queue, Topic} must be set.");
}
@Override
public PCollection<T> expand(PBegin input) {
- throw new UnsupportedOperationException("");
+ Configuration<T> configuration = configurationBuilder.build();
+ SempClientFactory sempClientFactory =
configuration.getSempClientFactory();
+ String jobName = input.getPipeline().getOptions().getJobName();
+ Queue initializedQueue =
+ initializeQueueForTopicIfNeeded(
+ configuration.getQueue(), configuration.getTopic(), jobName,
sempClientFactory);
+
+ SessionServiceFactory sessionServiceFactory =
configuration.getSessionServiceFactory();
+ sessionServiceFactory.setQueue(initializedQueue);
+
+ Coder<T> coder = inferCoder(input.getPipeline(),
configuration.getTypeDescriptor());
+
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ new UnboundedSolaceSource<>(
+ initializedQueue,
+ sempClientFactory,
+ sessionServiceFactory,
+ configuration.getMaxNumConnections(),
+ configuration.getDeduplicateRecords(),
+ coder,
+ configuration.getTimestampFn(),
+ configuration.getParseFn())));
+ }
+
+ @VisibleForTesting
+ Coder<T> inferCoder(Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
+ Coder<T> coderFromCoderRegistry = getFromCoderRegistry(pipeline,
typeDescriptor);
+ if (coderFromCoderRegistry != null) {
+ return coderFromCoderRegistry;
+ }
+
+ Coder<T> coderFromSchemaRegistry = getFromSchemaRegistry(pipeline,
typeDescriptor);
+ if (coderFromSchemaRegistry != null) {
+ return coderFromSchemaRegistry;
+ }
+
+ throw new RuntimeException(
+ "SolaceIO.Read: Cannot infer a coder for the TypeDescriptor.
Annotate your"
+ + " output class with @DefaultSchema annotation or create a
coder manually"
+ + " and register it in the CoderRegistry.");
+ }
+
+ private @Nullable Coder<T> getFromSchemaRegistry(
+ Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
+ try {
+ return pipeline.getSchemaRegistry().getSchemaCoder(typeDescriptor);
+ } catch (NoSuchSchemaException e) {
+ return null;
+ }
+ }
+
+ private @Nullable Coder<T> getFromCoderRegistry(
+ Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
+ try {
+ return pipeline.getCoderRegistry().getCoder(typeDescriptor);
+ } catch (CannotProvideCoderException e) {
+ return null;
+ }
+ }
+
+ private Queue initializeQueueForTopicIfNeeded(
+ @Nullable Queue queue,
+ @Nullable Topic topic,
+ String jobName,
+ SempClientFactory sempClientFactory) {
+ Queue initializedQueue;
+ if (queue != null) {
+ return queue;
+ } else {
+ String queueName = String.format("queue-%s-%s", topic, jobName);
+ try {
+ String topicName = checkNotNull(topic).getName();
+ initializedQueue =
sempClientFactory.create().createQueueForTopic(queueName, topicName);
+ LOG.warn(
+ "SolaceIO.Read: A new queue {} was created. The Queue will not
be"
+ + " deleted when this job finishes. Make sure to remove it
yourself"
+ + " when not needed.",
+ initializedQueue.getName());
+ return initializedQueue;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
}
}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 9b4ef99eba7..7d1dee7a118 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.sdk.io.solace.broker;
+import com.solacesystems.jcsmp.Queue;
import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* This abstract class serves as a blueprint for creating `SessionService`
objects. It introduces a
@@ -25,9 +27,26 @@ import java.io.Serializable;
*/
public abstract class SessionServiceFactory implements Serializable {
+ /**
+ * A reference to a Queue object. This is set when the pipline is
constructed (in the {@link
+ *
org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)}
method).
+ * This could be used to associate the created SessionService with a
specific queue for message
+ * handling.
+ */
+ @Nullable Queue queue;
+
/**
* This is the core method that subclasses must implement. It defines how to
construct and return
* a SessionService object.
*/
public abstract SessionService create();
+
+ /**
+ * This method is called in the {@link
+ *
org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)}
method
+ * to set the Queue reference.
+ */
+ public void setQueue(Queue queue) {
+ this.queue = queue;
+ }
}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
index 79057445a4e..97d688bff23 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
@@ -21,6 +21,8 @@ import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -74,6 +76,7 @@ public class Solace {
/** Represents a Solace message destination (either a Topic or a Queue). */
@AutoValue
+ @DefaultSchema(AutoValueSchema.class)
public abstract static class Destination {
/**
* Gets the name of the destination.
@@ -105,6 +108,7 @@ public class Solace {
/** Represents a Solace message record with its associated metadata. */
@AutoValue
+ @DefaultSchema(AutoValueSchema.class)
public abstract static class Record {
/**
* Gets the unique identifier of the message, a string for an
application-specific message
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
similarity index 51%
copy from
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
copy to
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index 9b4ef99eba7..285c1cb8a7e 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -15,19 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.solace.broker;
+package org.apache.beam.sdk.io.solace;
-import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
-/**
- * This abstract class serves as a blueprint for creating `SessionService`
objects. It introduces a
- * queue property and mandates the implementation of a create() method in
concrete subclasses.
- */
-public abstract class SessionServiceFactory implements Serializable {
+public class MockEmptySessionService implements SessionService {
+
+ String exceptionMessage = "This is an empty client, use a MockSessionService
instead.";
+
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
+
+ @Override
+ public boolean isClosed() {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
+
+ @Override
+ public MessageReceiver createReceiver() {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
- /**
- * This is the core method that subclasses must implement. It defines how to
construct and return
- * a SessionService object.
- */
- public abstract SessionService create();
+ @Override
+ public void connect() {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java
new file mode 100644
index 00000000000..d4703237371
--- /dev/null
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.Queue;
+import java.io.IOException;
+import org.apache.beam.sdk.io.solace.broker.SempClient;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public class MockSempClient implements SempClient {
+
+ private final SerializableFunction<String, Boolean> isQueueNonExclusiveFn;
+ private final SerializableFunction<String, Long> getBacklogBytesFn;
+ private final SerializableFunction<String, Integer> createQueueForTopicFn;
+
+ private MockSempClient(
+ SerializableFunction<String, Boolean> isQueueNonExclusiveFn,
+ SerializableFunction<String, Long> getBacklogBytesFn,
+ SerializableFunction<String, Integer> createQueueForTopicFn) {
+ this.isQueueNonExclusiveFn = isQueueNonExclusiveFn;
+ this.getBacklogBytesFn = getBacklogBytesFn;
+ this.createQueueForTopicFn = createQueueForTopicFn;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private SerializableFunction<String, Boolean> isQueueNonExclusiveFn =
(queueName) -> true;
+ private SerializableFunction<String, Long> getBacklogBytesFn = (queueName)
-> 0L;
+ private SerializableFunction<String, Integer> createQueueForTopicFn =
(queueName) -> 0;
+
+ public Builder setIsQueueNonExclusiveFn(
+ SerializableFunction<String, Boolean> isQueueNonExclusiveFn) {
+ this.isQueueNonExclusiveFn = isQueueNonExclusiveFn;
+ return this;
+ }
+
+ public Builder setGetBacklogBytesFn(SerializableFunction<String, Long>
getBacklogBytesFn) {
+ this.getBacklogBytesFn = getBacklogBytesFn;
+ return this;
+ }
+
+ public Builder setCreateQueueForTopicFn(
+ SerializableFunction<String, Integer> createQueueForTopicFn) {
+ this.createQueueForTopicFn = createQueueForTopicFn;
+ return this;
+ }
+
+ public MockSempClient build() {
+ return new MockSempClient(isQueueNonExclusiveFn, getBacklogBytesFn,
createQueueForTopicFn);
+ }
+ }
+
+ @Override
+ public boolean isQueueNonExclusive(String queueName) throws IOException {
+ return isQueueNonExclusiveFn.apply(queueName);
+ }
+
+ @Override
+ public Queue createQueueForTopic(String queueName, String topicName) throws
IOException {
+ createQueueForTopicFn.apply(queueName);
+ return JCSMPFactory.onlyInstance().createQueue(queueName);
+ }
+
+ @Override
+ public long getBacklogBytes(String queueName) throws IOException {
+ return getBacklogBytesFn.apply(queueName);
+ }
+}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java
similarity index 59%
copy from
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
copy to
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java
index 9b4ef99eba7..3e64c3d9bfe 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java
@@ -15,19 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.solace.broker;
+package org.apache.beam.sdk.io.solace;
-import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.broker.SempClient;
+import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
-/**
- * This abstract class serves as a blueprint for creating `SessionService`
objects. It introduces a
- * queue property and mandates the implementation of a create() method in
concrete subclasses.
- */
-public abstract class SessionServiceFactory implements Serializable {
+public class MockSempClientFactory implements SempClientFactory {
+ SempClient sempClient;
+
+ public MockSempClientFactory(SempClient sempClient) {
+ this.sempClient = sempClient;
+ }
+
+ public static SempClientFactory getDefaultMock() {
+ return new MockSempClientFactory(MockSempClient.builder().build());
+ }
- /**
- * This is the core method that subclasses must implement. It defines how to
construct and return
- * a SessionService object.
- */
- public abstract SessionService create();
+ @Override
+ public SempClient create() {
+ return sempClient;
+ }
}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
new file mode 100644
index 00000000000..7b14da138c6
--- /dev/null
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public class MockSessionService implements SessionService {
+
+ private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
+ private MessageReceiver messageReceiver = null;
+ private final int minMessagesReceived;
+
+ public MockSessionService(
+ SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int
minMessagesReceived) {
+ this.getRecordFn = getRecordFn;
+ this.minMessagesReceived = minMessagesReceived;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public MessageReceiver createReceiver() {
+ if (messageReceiver == null) {
+ messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived);
+ }
+ return messageReceiver;
+ }
+
+ @Override
+ public void connect() {}
+
+ public static class MockReceiver implements MessageReceiver, Serializable {
+ private final AtomicInteger counter = new AtomicInteger();
+ private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
+ private final int minMessagesReceived;
+
+ public MockReceiver(
+ SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int
minMessagesReceived) {
+ this.getRecordFn = getRecordFn;
+ this.minMessagesReceived = minMessagesReceived;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public BytesXMLMessage receive() throws IOException {
+ return getRecordFn.apply(counter.getAndIncrement());
+ }
+
+ @Override
+ public boolean isEOF() {
+ return counter.get() >= minMessagesReceived;
+ }
+ }
+}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
similarity index 57%
copy from
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
copy to
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
index 9b4ef99eba7..603a30ad2c9 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
@@ -15,19 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.solace.broker;
+package org.apache.beam.sdk.io.solace;
-import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
-/**
- * This abstract class serves as a blueprint for creating `SessionService`
objects. It introduces a
- * queue property and mandates the implementation of a create() method in
concrete subclasses.
- */
-public abstract class SessionServiceFactory implements Serializable {
+public class MockSessionServiceFactory extends SessionServiceFactory {
+ SessionService sessionService;
+
+ public MockSessionServiceFactory(SessionService clientService) {
+ this.sessionService = clientService;
+ }
+
+ public static SessionServiceFactory getDefaultMock() {
+ return new MockSessionServiceFactory(new MockEmptySessionService());
+ }
- /**
- * This is the core method that subclasses must implement. It defines how to
construct and return
- * a SessionService object.
- */
- public abstract SessionService create();
+ @Override
+ public SessionService create() {
+ return sessionService;
+ }
}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
new file mode 100644
index 00000000000..bd9d5d401b5
--- /dev/null
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.Queue;
+import com.solacesystems.jcsmp.Topic;
+import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.solace.SolaceIO.Read;
+import org.apache.beam.sdk.io.solace.SolaceIO.Read.Configuration;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.Record;
+import org.apache.beam.sdk.io.solace.data.SolaceDataUtils;
+import org.apache.beam.sdk.io.solace.data.SolaceDataUtils.SimpleRecord;
+import org.apache.beam.sdk.io.solace.read.SolaceCheckpointMark;
+import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SolaceIOTest {
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private Read<Record> getDefaultRead() {
+ return SolaceIO.read()
+ .from(Solace.Queue.fromName("queue"))
+ .withSempClientFactory(MockSempClientFactory.getDefaultMock())
+ .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock())
+ .withMaxNumConnections(1);
+ }
+
+ private Read<Record> getDefaultReadForTopic() {
+ return SolaceIO.read()
+ .from(Solace.Topic.fromName("topic"))
+ .withSempClientFactory(MockSempClientFactory.getDefaultMock())
+ .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock())
+ .withMaxNumConnections(1);
+ }
+
+ private static BytesXMLMessage getOrNull(Integer index,
List<BytesXMLMessage> messages) {
+ return index != null && index < messages.size() ? messages.get(index) :
null;
+ }
+
+ private static UnboundedSolaceSource<Record> getSource(Read<Record> spec,
TestPipeline pipeline) {
+ Configuration<Record> configuration = spec.configurationBuilder.build();
+ return new UnboundedSolaceSource<>(
+ configuration.getQueue(),
+ configuration.getSempClientFactory(),
+ configuration.getSessionServiceFactory(),
+ configuration.getMaxNumConnections(),
+ configuration.getDeduplicateRecords(),
+ spec.inferCoder(pipeline, configuration.getTypeDescriptor()),
+ configuration.getTimestampFn(),
+ configuration.getParseFn());
+ }
+
+ @Test
+ public void testReadMessages() {
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0",
"450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1",
"451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2",
"452"));
+ return getOrNull(index, messages);
+ },
+ 3);
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ // Expected data
+ List<Solace.Record> expected = new ArrayList<>();
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450"));
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451"));
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test2", "452"));
+
+ // Run the pipeline
+ PCollection<Solace.Record> events =
+ pipeline.apply(
+ "Read from Solace",
+
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory));
+
+ // Assert results
+ PAssert.that(events).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadMessagesWithDeduplication() {
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0",
"450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1",
"451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2",
"451"));
+ return getOrNull(index, messages);
+ },
+ 3);
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ // Expected data
+ List<Solace.Record> expected = new ArrayList<>();
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450"));
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451"));
+
+ // Run the pipeline
+ PCollection<Solace.Record> events =
+ pipeline.apply(
+ "Read from Solace",
+ getDefaultRead()
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withDeduplicateRecords(true));
+ // Assert results
+ PAssert.that(events).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadMessagesWithoutDeduplication() {
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0",
"450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1",
"451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2",
"451"));
+ return getOrNull(index, messages);
+ },
+ 3);
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ // Expected data
+ List<Solace.Record> expected = new ArrayList<>();
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450"));
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451"));
+ expected.add(SolaceDataUtils.getSolaceRecord("payload_test2", "451"));
+
+ // Run the pipeline
+
+ PCollection<Solace.Record> events =
+ pipeline.apply(
+ "Read from Solace",
+
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory));
+ // Assert results
+ PAssert.that(events).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() {
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test0", null, null, new
ReplicationGroupMessageIdImpl(2L, 1L)),
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test1", null, null, new
ReplicationGroupMessageIdImpl(2L, 2L)),
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test2", null, null, new
ReplicationGroupMessageIdImpl(2L, 2L)));
+ return getOrNull(index, messages);
+ },
+ 3);
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ // Expected data
+ List<Solace.Record> expected = new ArrayList<>();
+ expected.add(
+ SolaceDataUtils.getSolaceRecord(
+ "payload_test0", null, new ReplicationGroupMessageIdImpl(2L, 1L)));
+ expected.add(
+ SolaceDataUtils.getSolaceRecord(
+ "payload_test1", null, new ReplicationGroupMessageIdImpl(2L, 2L)));
+
+ // Run the pipeline
+ PCollection<Solace.Record> events =
+ pipeline.apply(
+ "Read from Solace",
+ getDefaultRead()
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withDeduplicateRecords(true));
+ // Assert results
+ PAssert.that(events).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadWithCoderAndParseFnAndTimestampFn() {
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0",
"450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1",
"451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2",
"452"));
+ return getOrNull(index, messages);
+ },
+ 3);
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ // Expected data
+ List<SimpleRecord> expected = new ArrayList<>();
+ expected.add(new SimpleRecord("payload_test0", "450"));
+ expected.add(new SimpleRecord("payload_test1", "451"));
+ expected.add(new SimpleRecord("payload_test2", "452"));
+
+ // Run the pipeline
+ PCollection<SimpleRecord> events =
+ pipeline.apply(
+ "Read from Solace",
+ SolaceIO.read(
+ TypeDescriptor.of(SimpleRecord.class),
+ input ->
+ new SimpleRecord(
+ new String(input.getBytes(),
StandardCharsets.UTF_8),
+ input.getApplicationMessageId()),
+ input -> Instant.ofEpochMilli(1708100477061L))
+ .from(Solace.Queue.fromName("queue"))
+ .withSempClientFactory(MockSempClientFactory.getDefaultMock())
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withMaxNumConnections(1));
+
+ // Assert results
+ PAssert.that(events).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ public void testNoQueueAndTopicSet() {
+ Read<Record> spec = SolaceIO.read();
+ assertThrows(IllegalStateException.class, () ->
spec.validate(pipeline.getOptions()));
+ }
+
+ @Test
+ public void testSplitsForExclusiveQueue() throws Exception {
+ MockSempClient mockSempClient =
+ MockSempClient.builder().setIsQueueNonExclusiveFn((q) ->
false).build();
+
+ Read<Record> spec =
+ SolaceIO.read()
+ .from(Solace.Queue.fromName("queue"))
+ .withSempClientFactory(new MockSempClientFactory(mockSempClient))
+
.withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock());
+
+ int desiredNumSplits = 5;
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+ List<UnboundedSolaceSource<Record>> splits =
+ initialSource.split(desiredNumSplits, PipelineOptionsFactory.create());
+ assertEquals(1, splits.size());
+ }
+
+ @Test
+ public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws
Exception {
+ Read<Record> spec = getDefaultRead().withMaxNumConnections(3);
+
+ int desiredNumSplits = 5;
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+ List<UnboundedSolaceSource<Record>> splits =
+ initialSource.split(desiredNumSplits, PipelineOptionsFactory.create());
+ assertEquals(3, splits.size());
+ }
+
+ @Test
+ public void
testSplitsForNonExclusiveQueueWithMaxNumConnectionsRespectDesired() throws
Exception {
+ Read<Record> spec = getDefaultRead().withMaxNumConnections(10);
+ int desiredNumSplits = 5;
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+ List<UnboundedSolaceSource<Record>> splits =
+ initialSource.split(desiredNumSplits, PipelineOptionsFactory.create());
+ assertEquals(5, splits.size());
+ }
+
+ @Test
+ public void testCreateQueueForTopic() {
+ AtomicInteger createQueueForTopicFnCounter = new AtomicInteger(0);
+ MockSempClient mockSempClient =
+ MockSempClient.builder()
+ .setCreateQueueForTopicFn((q) ->
createQueueForTopicFnCounter.incrementAndGet())
+ .build();
+
+ Read<Record> spec =
+ getDefaultReadForTopic().withSempClientFactory(new
MockSempClientFactory(mockSempClient));
+ spec.expand(PBegin.in(TestPipeline.create()));
+ // check if createQueueForTopic was executed
+ assertEquals(1, createQueueForTopicFnCounter.get());
+ }
+
+ @Test
+ public void testCheckpointMark() throws Exception {
+ AtomicInteger countConsumedMessages = new AtomicInteger(0);
+ AtomicInteger countAckMessages = new AtomicInteger(0);
+
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) ->
countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ },
+ 10);
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+ Read<Record> spec =
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory);
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+ UnboundedReader<Record> reader =
+ initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(String.format("Failed at %d-th message", i),
reader.advance());
+ }
+
+ // check if 4 messages were consumed
+ assertEquals(4, countConsumedMessages.get());
+
+ // check if no messages were acknowledged yet
+ assertEquals(0, countAckMessages.get());
+
+ // finalize the checkpoint
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ // check if messages were acknowledged
+ assertEquals(4, countAckMessages.get());
+ }
+
+ @Test
+ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
+ AtomicInteger countConsumedMessages = new AtomicInteger(0);
+ AtomicInteger countAckMessages = new AtomicInteger(0);
+
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) ->
countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ },
+ 10);
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ Read<Record> spec =
+ getDefaultRead()
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withMaxNumConnections(4);
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+ UnboundedReader<Record> reader =
+ initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(String.format("Failed at %d-th message", i),
reader.advance());
+ }
+
+ // create checkpoint but don't finalize yet
+ CheckpointMark checkpointMark = reader.getCheckpointMark();
+
+ // consume 2 more messages
+ reader.advance();
+ reader.advance();
+
+ // check if messages are still not acknowledged
+ assertEquals(0, countAckMessages.get());
+
+ // acknowledge from the first checkpoint
+ checkpointMark.finalizeCheckpoint();
+
+ // only messages from the first checkpoint are acknowledged
+ assertEquals(4, countAckMessages.get());
+ }
+
+ @Test
+ public void testCheckpointMarkSafety() throws Exception {
+
+ final int messagesToProcess = 100;
+
+ AtomicInteger countConsumedMessages = new AtomicInteger(0);
+ AtomicInteger countAckMessages = new AtomicInteger(0);
+
+ // Broker that creates input data
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages = new ArrayList<>();
+ for (int i = 0; i < messagesToProcess; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) ->
countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ },
+ 10);
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+ Read<Record> spec =
+ getDefaultRead()
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withMaxNumConnections(4);
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+ UnboundedReader<Record> reader =
+ initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume half the messages (NB: start already consumed the first message)
+ for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // the messages are still pending in the queue (no ACK yet)
+ assertEquals(0, countAckMessages.get());
+
+ // we finalize the checkpoint for the already-processed messages while
simultaneously
+ // consuming the remainder of messages from the queue
+ Thread runner =
+ new Thread(
+ () -> {
+ try {
+ for (int i = 0; i < messagesToProcess / 2; i++) {
+ assertTrue(reader.advance());
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ runner.start();
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ // Concurrency issues would cause an exception to be thrown before this
method exits,
+ // failing the test
+ runner.join();
+ }
+
+ @Test
+ public void testDefaultCoder() {
+ Coder<SolaceCheckpointMark> coder =
+ new UnboundedSolaceSource<>(null, null, null, 0, false, null, null,
null)
+ .getCheckpointMarkCoder();
+ CoderProperties.coderSerializable(coder);
+ }
+
+ @Test
+ public void testDestinationTopicQueueCreation() {
+ String topicName = "some-topic";
+ String queueName = "some-queue";
+ Topic topic = SolaceIO.topicFromName(topicName);
+ Queue queue = SolaceIO.queueFromName(queueName);
+
+ Destination dest = topic;
+ assertTrue(dest instanceof Topic);
+ assertFalse(dest instanceof Queue);
+ assertEquals(topicName, dest.getName());
+
+ dest = queue;
+ assertTrue(dest instanceof Queue);
+ assertFalse(dest instanceof Topic);
+ assertEquals(queueName, dest.getName());
+
+ Record r = SolaceDataUtils.getSolaceRecord("payload_test0", "450");
+ dest = SolaceIO.convertToJcsmpDestination(r.getDestination());
+ assertTrue(dest instanceof Topic);
+ assertFalse(dest instanceof Queue);
+ }
+
+ @Test
+ public void testTopicEncoding() {
+ MockSessionService mockClientService =
+ new MockSessionService(
+ index -> {
+ List<BytesXMLMessage> messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0",
"450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1",
"451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2",
"452"));
+ return getOrNull(index, messages);
+ },
+ 3);
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ new MockSessionServiceFactory(mockClientService);
+
+ // Run
+ PCollection<Solace.Record> events =
+ pipeline.apply(
+ "Read from Solace",
+
getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory));
+
+ // Run the pipeline
+ PCollection<Boolean> destAreTopics =
+ events.apply(
+ MapElements.into(TypeDescriptors.booleans())
+ .via(
+ r -> {
+ Destination dest =
SolaceIO.convertToJcsmpDestination(r.getDestination());
+ return dest instanceof Topic;
+ }));
+
+ List<Boolean> expected = ImmutableList.of(true, true, true);
+
+ // Assert results
+ PAssert.that(destAreTopics).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
new file mode 100644
index 00000000000..2e953150c6d
--- /dev/null
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.data;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.MessageType;
+import com.solacesystems.jcsmp.ReplicationGroupMessageId;
+import com.solacesystems.jcsmp.SDTMap;
+import com.solacesystems.jcsmp.User_Cos;
+import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SolaceDataUtils {
+ public static final ReplicationGroupMessageId DEFAULT_REPLICATION_GROUP_ID =
+ new ReplicationGroupMessageIdImpl(1L, 136L);
+
+ @DefaultSchema(JavaBeanSchema.class)
+ public static class SimpleRecord {
+ public String payload;
+ public String messageId;
+
+ public SimpleRecord() {}
+
+ public SimpleRecord(String payload, String messageId) {
+ this.payload = payload;
+ this.messageId = messageId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SimpleRecord)) {
+ return false;
+ }
+ SimpleRecord that = (SimpleRecord) o;
+ return Objects.equals(payload, that.payload) &&
Objects.equals(messageId, that.messageId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(payload, messageId);
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleRecord{"
+ + "payload='"
+ + payload
+ + '\''
+ + ", messageId='"
+ + messageId
+ + '\''
+ + '}';
+ }
+ }
+
+ public static Solace.Record getSolaceRecord(String payload, String
messageId) {
+ return getSolaceRecord(payload, messageId, null);
+ }
+
+ public static Solace.Record getSolaceRecord(
+ String payload,
+ String messageId,
+ @Nullable ReplicationGroupMessageId replicationGroupMessageId) {
+ String replicationGroupMessageIdString =
+ replicationGroupMessageId != null
+ ? replicationGroupMessageId.toString()
+ : DEFAULT_REPLICATION_GROUP_ID.toString();
+
+ return Solace.Record.builder()
+ .setPayload(ByteString.copyFrom(payload, StandardCharsets.UTF_8))
+ .setMessageId(messageId)
+ .setDestination(
+ Solace.Destination.builder()
+ .setName("destination-topic")
+ .setType(Solace.DestinationType.TOPIC)
+ .build())
+ .setExpiration(1000L)
+ .setPriority(0)
+ .setReceiveTimestamp(1708100477067L)
+ .setRedelivered(false)
+ .setReplyTo(null)
+ .setSequenceNumber(null)
+ .setTimeToLive(1000L)
+ .setSenderTimestamp(null)
+ .setReplicationGroupMessageId(replicationGroupMessageIdString)
+ .setAttachmentBytes(ByteString.EMPTY)
+ .build();
+ }
+
+ public static BytesXMLMessage getBytesXmlMessage(String payload, String
messageId) {
+ return getBytesXmlMessage(payload, messageId, null, null);
+ }
+
+ public static BytesXMLMessage getBytesXmlMessage(
+ String payload, String messageId, SerializableFunction<Integer, Integer>
ackMessageFn) {
+ return getBytesXmlMessage(payload, messageId, ackMessageFn, null);
+ }
+
+ public static BytesXMLMessage getBytesXmlMessage(
+ String payload,
+ String messageId,
+ SerializableFunction<Integer, Integer> ackMessageFn,
+ ReplicationGroupMessageId replicationGroupMessageId) {
+ long receiverTimestamp = 1708100477067L;
+ long expiration = 1000L;
+ long timeToLive = 1000L;
+ String destination = "destination-topic";
+
+ ReplicationGroupMessageId useReplicationGroupId =
+ replicationGroupMessageId != null
+ ? replicationGroupMessageId
+ : DEFAULT_REPLICATION_GROUP_ID;
+ return new BytesXMLMessage() {
+
+ @Override
+ public byte[] getBytes() {
+ return payload.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public int getContentLength() {
+ return payload.getBytes(StandardCharsets.UTF_8).length;
+ }
+
+ @Override
+ public int readBytes(byte[] arg0) {
+ return 0;
+ }
+
+ @Override
+ public int readBytes(byte[] arg0, int arg1) {
+ return 0;
+ }
+
+ @Override
+ public void rewindContent() {}
+
+ @Override
+ public void writeBytes(byte[] arg0) {}
+
+ @Override
+ public void writeBytes(byte[] arg0, int arg1, int arg2) {}
+
+ @Override
+ public void ackMessage() {
+ if (ackMessageFn != null) {
+ ackMessageFn.apply(0);
+ }
+ }
+
+ @Override
+ public void clearAttachment() {}
+
+ @Override
+ public void clearBinaryMetadataBytes(int arg0) {}
+
+ @Override
+ public void clearContent() {}
+
+ @Override
+ public void clearQueueNameLocation() {}
+
+ @Override
+ public void clearTopicNameLocation() {}
+
+ @Override
+ public String dump() {
+ return null;
+ }
+
+ @Override
+ public String dump(int arg0) {
+ return null;
+ }
+
+ @Override
+ public long getAckMessageId() {
+ return 0;
+ }
+
+ @Override
+ public String getAppMessageID() {
+ return null;
+ }
+
+ @Override
+ public String getAppMessageType() {
+ return null;
+ }
+
+ @Override
+ public String getApplicationMessageId() {
+ return messageId;
+ }
+
+ @Override
+ public String getApplicationMessageType() {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getAttachmentByteBuffer() {
+ return null;
+ }
+
+ @Override
+ public int getAttachmentContentLength() {
+ return 0;
+ }
+
+ @Override
+ public int getBinaryMetadataContentLength(int arg0) {
+ return 0;
+ }
+
+ @Override
+ public Collection<Integer> getBinaryMetadataTypes() {
+ return null;
+ }
+
+ @Override
+ public Long getCacheRequestId() {
+ return null;
+ }
+
+ @Override
+ public List<Long> getConsumerIdList() {
+ return null;
+ }
+
+ @Override
+ public String getCorrelationId() {
+ return null;
+ }
+
+ @Override
+ public Object getCorrelationKey() {
+ return null;
+ }
+
+ @Override
+ public User_Cos getCos() {
+ return null;
+ }
+
+ @Override
+ public boolean getDeliverToOne() {
+ return false;
+ }
+
+ @Override
+ public int getDeliveryCount() throws UnsupportedOperationException {
+ return 0;
+ }
+
+ @Override
+ public DeliveryMode getDeliveryMode() {
+ return null;
+ }
+
+ @Override
+ public Destination getDestination() {
+ return JCSMPFactory.onlyInstance().createTopic(destination);
+ }
+
+ @Override
+ public String getDestinationTopicSuffix() {
+ return null;
+ }
+
+ @Override
+ public boolean getDiscardIndication() {
+ return false;
+ }
+
+ @Override
+ public long getExpiration() {
+ return expiration;
+ }
+
+ @Override
+ public String getHTTPContentEncoding() {
+ return null;
+ }
+
+ @Override
+ public String getHTTPContentType() {
+ return null;
+ }
+
+ @Override
+ public String getMessageId() {
+ return null;
+ }
+
+ @Override
+ public long getMessageIdLong() {
+ return 0;
+ }
+
+ @Override
+ public MessageType getMessageType() {
+ return null;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public SDTMap getProperties() {
+ return null;
+ }
+
+ @Override
+ public int getQueueNameLength() {
+ return 0;
+ }
+
+ @Override
+ public int getQueueNameOffset() {
+ return 0;
+ }
+
+ @Override
+ public long getReceiveTimestamp() {
+ return receiverTimestamp;
+ }
+
+ @Override
+ public boolean getRedelivered() {
+ return false;
+ }
+
+ @Override
+ public ReplicationGroupMessageId getReplicationGroupMessageId() {
+ // this is always set by Solace
+ return useReplicationGroupId;
+ }
+
+ @Override
+ public Destination getReplyTo() {
+ return null;
+ }
+
+ @Override
+ public String getReplyToSuffix() {
+ return null;
+ }
+
+ @Override
+ public Long getSendTimestamp() {
+ return null;
+ }
+
+ @Override
+ public String getSenderID() {
+ return null;
+ }
+
+ @Override
+ public String getSenderId() {
+ return null;
+ }
+
+ @Override
+ public Long getSenderTimestamp() {
+ return null;
+ }
+
+ @Override
+ public Long getSequenceNumber() {
+ return null;
+ }
+
+ @Override
+ public byte getStructuredMsgType() {
+ return 0x2;
+ }
+
+ @Override
+ public boolean getTQDiscardIndication() {
+ return false;
+ }
+
+ @Override
+ public long getTimeToLive() {
+ return timeToLive;
+ }
+
+ @Override
+ public int getTopicNameLength() {
+ return 5;
+ }
+
+ @Override
+ public int getTopicNameOffset() {
+ return 0;
+ }
+
+ @Override
+ public Long getTopicSequenceNumber() {
+ return null;
+ }
+
+ @Override
+ public byte[] getUserData() {
+ return null;
+ }
+
+ @Override
+ public boolean hasAttachment() {
+ return false;
+ }
+
+ @Override
+ public boolean hasBinaryMetadata(int arg0) {
+ return false;
+ }
+
+ @Override
+ public boolean hasContent() {
+ return false;
+ }
+
+ @Override
+ public boolean hasUserData() {
+ return false;
+ }
+
+ @Override
+ public boolean isAckImmediately() {
+ return false;
+ }
+
+ @Override
+ public boolean isCacheMessage() {
+ return false;
+ }
+
+ @Override
+ public boolean isDMQEligible() {
+ return false;
+ }
+
+ @Override
+ public boolean isDeliveryCountSupported() {
+ return false;
+ }
+
+ @Override
+ public boolean isElidingEligible() {
+ return false;
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return false;
+ }
+
+ @Override
+ public boolean isReplyMessage() {
+ return false;
+ }
+
+ @Override
+ public boolean isStructuredMsg() {
+ return false;
+ }
+
+ @Override
+ public boolean isSuspect() {
+ return false;
+ }
+
+ @Override
+ public int readAttachmentBytes(byte[] arg0) {
+ return 0;
+ }
+
+ @Override
+ public int readAttachmentBytes(byte[] arg0, int arg1) {
+ return 0;
+ }
+
+ @Override
+ public int readAttachmentBytes(int arg0, byte[] arg1, int arg2, int
arg3) {
+ return 0;
+ }
+
+ @Override
+ public int readBinaryMetadataBytes(int arg0, byte[] arg1) {
+ return 0;
+ }
+
+ @Override
+ public int readContentBytes(byte[] arg0) {
+ return 0;
+ }
+
+ @Override
+ public int readContentBytes(byte[] arg0, int arg1) {
+ return 0;
+ }
+
+ @Override
+ public int readContentBytes(int arg0, byte[] arg1, int arg2, int arg3) {
+ return 0;
+ }
+
+ @Override
+ public void rejectMessage() {}
+
+ @Override
+ public void reset() {}
+
+ @Override
+ public void resetPayload() {}
+
+ @Override
+ public void rewindAttachment() {}
+
+ @Override
+ public void setAckImmediately(boolean arg0) {}
+
+ @Override
+ public void setAppMessageID(String arg0) {}
+
+ @Override
+ public void setAppMessageType(String arg0) {}
+
+ @Override
+ public void setApplicationMessageId(String arg0) {}
+
+ @Override
+ public void setApplicationMessageType(String arg0) {}
+
+ @Override
+ public void setAsReplyMessage(boolean arg0) {}
+
+ @Override
+ public void setCorrelationId(String arg0) {}
+
+ @Override
+ public void setCorrelationKey(Object arg0) {}
+
+ @Override
+ public void setCos(User_Cos arg0) {}
+
+ @Override
+ public void setDMQEligible(boolean arg0) {}
+
+ @Override
+ public void setDeliverToOne(boolean arg0) {}
+
+ @Override
+ public void setDeliveryMode(DeliveryMode arg0) {}
+
+ @Override
+ public void setElidingEligible(boolean arg0) {}
+
+ @Override
+ public void setExpiration(long arg0) {}
+
+ @Override
+ public void setHTTPContentEncoding(String arg0) {}
+
+ @Override
+ public void setHTTPContentType(String arg0) {}
+
+ @Override
+ public void setMessageType(MessageType arg0) {}
+
+ @Override
+ public void setPriority(int arg0) {}
+
+ @Override
+ public void setProperties(SDTMap arg0) {}
+
+ @Override
+ public void setQueueNameLocation(int arg0, int arg1) {}
+
+ @Override
+ public void setReadOnly() {}
+
+ @Override
+ public void setReplyTo(Destination arg0) {}
+
+ @Override
+ public void setReplyToSuffix(String arg0) {}
+
+ @Override
+ public void setSendTimestamp(long arg0) {}
+
+ @Override
+ public void setSenderID(String arg0) {}
+
+ @Override
+ public void setSenderId(String arg0) {}
+
+ @Override
+ public void setSenderTimestamp(long arg0) {}
+
+ @Override
+ public void setSequenceNumber(long arg0) {}
+
+ @Override
+ public void setStructuredMsg(boolean arg0) {}
+
+ @Override
+ public void setStructuredMsgType(byte arg0) {}
+
+ @Override
+ public void setTimeToLive(long arg0) {}
+
+ @Override
+ public void setTopicNameLocation(int arg0, int arg1) {}
+
+ @Override
+ public void setUserData(byte[] arg0) {}
+
+ @Override
+ public void settle(Outcome arg0) throws JCSMPException {}
+
+ @Override
+ public int writeAttachment(byte[] arg0) {
+ return 0;
+ }
+
+ @Override
+ public int writeAttachment(InputStream arg0) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int writeAttachment(byte[] arg0, int arg1, int arg2) throws
BufferUnderflowException {
+ return 0;
+ }
+
+ @Override
+ public int writeBinaryMetadataBytes(int arg0, byte[] arg1) {
+ return 0;
+ }
+
+ @Override
+ public int writeBinaryMetadataBytes(int arg0, byte[] arg1, int arg2, int
arg3)
+ throws BufferUnderflowException {
+ return 0;
+ }
+
+ @Override
+ public int writeNewAttachment(byte[] arg0) {
+ return 0;
+ }
+
+ @Override
+ public int writeNewAttachment(InputStream arg0) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int writeNewAttachment(byte[] arg0, int arg1, int arg2)
+ throws BufferUnderflowException {
+ return 0;
+ }
+
+ @Override
+ public int writeNewAttachment(InputStream arg0, int arg1, int arg2)
throws IOException {
+ return 0;
+ }
+ };
+ }
+}