This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 04ed95d6037 JCSMP properties providers for new SolaceIO write
connector (#31906)
04ed95d6037 is described below
commit 04ed95d603781ded7a7649500f18f34c4bbc6dfd
Author: Israel Herraiz <[email protected]>
AuthorDate: Mon Jul 22 17:19:08 2024 +0200
JCSMP properties providers for new SolaceIO write connector (#31906)
* JCSMP properties providers for new SolaceIO write connector
This adds a base class and two providers for the new SolaceIO write
connector. The basic authentication provider just uses a username and
password.
Users can write their own providers to set authentication mechanisms
and other session properties, by extending from the base class. As an
example, we provide a GoogleCloudSecretProvider, to show how to create
your own. This provider can also be used "as is", but it is mainly
meant as an example to implement your own.
In upcoming PRs, I will be submitting the rest of the write
connector. It is thousands of lines of code, so I am splitting in
smaller PRs, to facilitate code reviews.
This PR contributes to #31905
* Fix CheckStyle and Spotbugs errors
* Fix class dependencies declaration
* Unify session and auth providers/factories for the read and write
connectors
* Fix format violations in comment
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +
sdks/java/io/solace/build.gradle | 3 +
.../org/apache/beam/sdk/io/solace/SolaceIO.java | 10 ++
.../broker/BasicAuthJcsmpSessionService.java | 21 ++-
.../BasicAuthJcsmpSessionServiceFactory.java | 3 +-
.../broker/GCPSecretSessionServiceFactory.java | 169 ++++++++++++++++++
.../beam/sdk/io/solace/broker/SessionService.java | 196 ++++++++++++++++++++-
.../io/solace/broker/SessionServiceFactory.java | 3 +-
.../sdk/io/solace/MockEmptySessionService.java | 8 +-
.../beam/sdk/io/solace/MockSessionService.java | 28 ++-
.../solace/broker/BasicAuthWriterSessionTest.java | 106 +++++++++++
.../broker/OverrideWriterPropertiesTest.java | 56 ++++++
12 files changed, 587 insertions(+), 18 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bbd23a08bcd..e603e49f842 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -758,6 +758,7 @@ class BeamModulePlugin implements Plugin<Project> {
// [bomupgrader] the BOM version is set by
scripts/tools/bomupgrader.py. If update manually, also update
// libraries-bom version on
sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom :
"com.google.cloud:libraries-bom:26.39.0",
+ google_cloud_secret_manager :
"com.google.cloud:google-cloud-secretmanager", //
google_cloud_platform_libraries_bom sets version
google_cloud_spanner :
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom
sets version
google_cloud_spanner_test :
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_vertexai :
"com.google.cloud:google-cloud-vertexai", //
google_cloud_platform_libraries_bom sets version
@@ -858,6 +859,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_firestore_v1 :
"com.google.api.grpc:proto-google-cloud-firestore-v1", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 :
"com.google.api.grpc:proto-google-cloud-pubsub-v1", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsublite_v1 :
"com.google.api.grpc:proto-google-cloud-pubsublite-v1", //
google_cloud_platform_libraries_bom sets version
+ proto_google_cloud_secret_manager_v1 :
"com.google.api.grpc:proto-google-cloud-secretmanager-v1", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_v1 :
"com.google.api.grpc:proto-google-cloud-spanner-v1", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_admin_database_v1:
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", //
google_cloud_platform_libraries_bom sets version
proto_google_common_protos :
"com.google.api.grpc:proto-google-common-protos", //
google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index 7a74236539f..741db51a577 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -34,6 +34,9 @@ dependencies {
implementation library.java.joda_time
implementation library.java.solace
implementation library.java.google_cloud_core
+ implementation library.java.google_cloud_secret_manager
+ implementation library.java.proto_google_cloud_secret_manager_v1
+ implementation library.java.protobuf_java
implementation library.java.vendored_guava_32_1_2_jre
implementation project(":sdks:java:extensions:avro")
implementation library.java.avro
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
index e6b0dd34b18..980267b3f35 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
@@ -181,6 +181,10 @@ import org.slf4j.LoggerFactory;
*
* }</pre>
*
+ * <h2>Writing</h2>
+ *
+ * TBD
+ *
* <h3>Authentication</h3>
*
* <p>When reading from Solace, the user must use {@link
@@ -209,6 +213,12 @@ public class SolaceIO {
};
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
+ // Part of the new write connector, documentation to be updated in upcoming
pull requests
+ public enum SubmissionMode {
+ HIGHER_THROUGHPUT,
+ LOWER_LATENCY
+ }
+
/** Get a {@link Topic} object from the topic name. */
static Topic topicFromName(String topicName) {
return JCSMPFactory.onlyInstance().createTopic(topicName);
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
index 7863dbd129c..df814b5c2be 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
@@ -39,7 +39,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
* <p>This class provides a way to connect to a Solace broker and receive
messages from a queue. The
* connection is established using basic authentication.
*/
-public class BasicAuthJcsmpSessionService implements SessionService {
+public class BasicAuthJcsmpSessionService extends SessionService {
private final String queueName;
private final String host;
private final String username;
@@ -137,12 +137,19 @@ public class BasicAuthJcsmpSessionService implements
SessionService {
}
private JCSMPSession createSessionObject() throws InvalidPropertiesException
{
- JCSMPProperties properties = new JCSMPProperties();
- properties.setProperty(JCSMPProperties.HOST, host);
- properties.setProperty(JCSMPProperties.USERNAME, username);
- properties.setProperty(JCSMPProperties.PASSWORD, password);
- properties.setProperty(JCSMPProperties.VPN_NAME, vpnName);
-
+ JCSMPProperties properties = initializeSessionProperties(new
JCSMPProperties());
return JCSMPFactory.onlyInstance().createSession(properties);
}
+
+ @Override
+ public JCSMPProperties initializeSessionProperties(JCSMPProperties
baseProps) {
+ baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName);
+
+ baseProps.setProperty(
+ JCSMPProperties.AUTHENTICATION_SCHEME,
JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
+ baseProps.setProperty(JCSMPProperties.USERNAME, username);
+ baseProps.setProperty(JCSMPProperties.PASSWORD, password);
+ baseProps.setProperty(JCSMPProperties.HOST, host);
+ return baseProps;
+ }
}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
index 8cb4ff0af05..2084e61b7e3 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.solace.broker;
+import static
org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import com.google.auto.value.AutoValue;
@@ -39,7 +40,7 @@ public abstract class BasicAuthJcsmpSessionServiceFactory
extends SessionService
public abstract String vpnName();
public static Builder builder() {
- return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder();
+ return new
AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder().vpnName(DEFAULT_VPN_NAME);
}
@AutoValue.Builder
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
new file mode 100644
index 00000000000..dd87e1d75fa
--- /dev/null
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static
org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretVersionName;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a {@link SessionServiceFactory} that retrieve the
basic authentication
+ * credentials from a Google Cloud Secret Manager secret.
+ *
+ * <p>It can be used to avoid having to pass the password as an option of your
pipeline. For this
+ * provider to work, the worker where the job runs needs to have the necessary
credentials to access
+ * the secret. In Dataflow, this implies adding the necessary permissions to
the worker service
+ * account. For other runners, set the credentials in the pipeline options
using {@link
+ * org.apache.beam.sdk.extensions.gcp.options.GcpOptions}.
+ *
+ * <p>It also shows how to implement a {@link SessionServiceFactory} that
depends on using external
+ * resources to retrieve the Solace session properties. In this case, using
the Google Cloud Secrete
+ * Manager client.
+ *
+ * <p>Example of how to create the provider object:
+ *
+ * <pre>{@code
+ * GCPSecretSessionServiceFactory factory =
+ * GCPSecretSessionServiceFactory.builder()
+ * .username("user")
+ * .host("host:port")
+ * .passwordSecretName("secret-name")
+ * .build();
+ *
+ * SessionService serviceUsingGCPSecret = factory.create();
+ * }</pre>
+ */
+@AutoValue
+public abstract class GCPSecretSessionServiceFactory extends
SessionServiceFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GCPSecretSessionServiceFactory.class);
+
+ private static final String PROJECT_NOT_FOUND = "PROJECT-NOT-FOUND";
+
+ public abstract String username();
+
+ public abstract String host();
+
+ public abstract String passwordSecretName();
+
+ public abstract String vpnName();
+
+ public abstract @Nullable String secretManagerProjectId();
+
+ public abstract String passwordSecretVersion();
+
+ public static GCPSecretSessionServiceFactory.Builder builder() {
+ return new AutoValue_GCPSecretSessionServiceFactory.Builder()
+ .passwordSecretVersion("latest")
+ .vpnName(DEFAULT_VPN_NAME);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /** Username to be used to authenticate with the broker. */
+ public abstract GCPSecretSessionServiceFactory.Builder username(String
username);
+
+ /**
+ * The location of the broker, including port details if it is not
listening in the default
+ * port.
+ */
+ public abstract GCPSecretSessionServiceFactory.Builder host(String host);
+
+ /** The Secret Manager secret name where the password is stored. */
+ public abstract GCPSecretSessionServiceFactory.Builder
passwordSecretName(String name);
+
+ /** Optional. Solace broker VPN name. If not set, "default" is used. */
+ public abstract GCPSecretSessionServiceFactory.Builder vpnName(String
name);
+
+ /**
+ * Optional for Dataflow or VMs running on Google Cloud. The project id of
the project where the
+ * secret is stored. If not set, the project id where the job is running
is used.
+ */
+ public abstract GCPSecretSessionServiceFactory.Builder
secretManagerProjectId(String id);
+
+ /** Optional. Solace broker password secret version. If not set, "latest"
is used. */
+ public abstract GCPSecretSessionServiceFactory.Builder
passwordSecretVersion(String version);
+
+ public abstract GCPSecretSessionServiceFactory build();
+ }
+
+ @Override
+ public SessionService create() {
+ String password = null;
+ try {
+ password = retrieveSecret();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ BasicAuthJcsmpSessionServiceFactory factory =
+ BasicAuthJcsmpSessionServiceFactory.builder()
+ .username(username())
+ .host(host())
+ .password(password)
+ .vpnName(vpnName())
+ .build();
+
+ return factory.create();
+ }
+
+ private String retrieveSecret() throws IOException {
+ try (SecretManagerServiceClient client =
SecretManagerServiceClient.create()) {
+ String projectId =
+
Optional.ofNullable(secretManagerProjectId()).orElse(getProjectIdFromVmMetadata());
+ SecretVersionName secretVersionName =
+ SecretVersionName.of(projectId, passwordSecretName(),
passwordSecretVersion());
+ return
client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getProjectIdFromVmMetadata() throws IOException {
+ URL metadataUrl =
+ new
URL("http://metadata.google.internal/computeMetadata/v1/project/project-id");
+ HttpURLConnection connection = (HttpURLConnection)
metadataUrl.openConnection();
+ connection.setRequestProperty("Metadata-Flavor", "Google");
+
+ String output;
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(connection.getInputStream(),
StandardCharsets.UTF_8))) {
+ output = reader.readLine();
+ }
+
+ if (output == null || output.isEmpty()) {
+ LOG.error(
+ "Cannot retrieve project id from VM metadata, please set a project
id in your GoogleCloudSecretProvider.");
+ }
+ return output != null ? output : PROJECT_NOT_FOUND;
+ }
+}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
index cd368865f0c..aed700a71de 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
@@ -17,34 +17,220 @@
*/
package org.apache.beam.sdk.io.solace.broker;
+import com.solacesystems.jcsmp.JCSMPProperties;
import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The SessionService interface provides a set of methods for managing a
session with the Solace
* messaging system. It allows for establishing a connection, creating a
message-receiver object,
* checking if the connection is closed or not, and gracefully closing the
session.
+ *
+ * <p>Override this class and the method {@link
#initializeSessionProperties(JCSMPProperties)} with
+ * your specific properties, including all those related to authentication.
+ *
+ * <p>The connector will call the method only once per session created, so you
can perform
+ * relatively heavy operations in that method (e.g. connect to a store or
vault to retrieve
+ * credentials).
+ *
+ * <p>There are some default properties that are set by default and can be
overridden in this
+ * provider, that are relevant for the writer connector, and not used in the
case of the read
+ * connector (since they are not necessary for reading):
+ *
+ * <ul>
+ * <li>VPN_NAME: default
+ * <li>GENERATE_SEND_TIMESTAMPS: true
+ * <li>PUB_MULTI_THREAD: true
+ * </ul>
+ *
+ * <p>The connector overrides other properties, regardless of what this
provider sends to the
+ * connector. Those properties are the following. Again, these properties are
only relevant for the
+ * write connector.
+ *
+ * <ul>
+ * <li>PUB_ACK_WINDOW_SIZE
+ * <li>MESSAGE_CALLBACK_ON_REACTOR
+ * </ul>
+ *
+ * Those properties are set by the connector based on the values of {@link
+ *
org.apache.beam.sdk.io.solace.SolaceIO.Write#withWriterType(SolaceIO.WriterType)}
and {@link
+ *
org.apache.beam.sdk.io.solace.SolaceIO.Write#withSubmissionMode(SolaceIO.SubmissionMode)}.
+ *
+ * <p>The method will always run in a worker thread or task, and not in the
driver program. If you
+ * need to access any resource to set the properties, you need to make sure
that the worker has the
+ * network connectivity required for that, and that any credential or
configuration is passed to the
+ * provider through the constructor.
+ *
+ * <p>The connector ensures that no two threads will be calling that method at
the same time, so you
+ * don't have to take any specific precautions to avoid race conditions.
+ *
+ * <p>For basic authentication, use {@link BasicAuthJcsmpSessionService} and
{@link
+ * BasicAuthJcsmpSessionServiceFactory}.
+ *
+ * <p>For other situations, you need to extend this class. For instance:
+ *
+ * <pre>{@code
+ * public class MySessionService extends SessionService {
+ * private final String authToken;
+ *
+ * public MySessionService(String token) {
+ * this.oauthToken = token;
+ * ...
+ * }
+ *
+ * {@literal }@Override
+ * public JCSMPProperties initializeSessionProperties(JCSMPProperties
baseProps) {
+ * baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME,
JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
+ * baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken);
+ * return props;
+ * }
+ *
+ * {@literal }@Override
+ * public void connect() {
+ * ...
+ * }
+ *
+ * ...
+ * }
+ * }</pre>
*/
-public interface SessionService extends Serializable {
+public abstract class SessionService implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(SessionService.class);
+
+ public static final String DEFAULT_VPN_NAME = "default";
+
+ private static final int STREAMING_PUB_ACK_WINDOW = 50;
+ private static final int BATCHED_PUB_ACK_WINDOW = 255;
/**
* Establishes a connection to the service. This could involve providing
connection details like
* host, port, VPN name, username, and password.
*/
- void connect();
+ public abstract void connect();
/** Gracefully closes the connection to the service. */
- void close();
+ public abstract void close();
/**
* Checks whether the connection to the service is currently closed. This
method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be
created if this
* returns true.
*/
- boolean isClosed();
+ public abstract boolean isClosed();
/**
* Creates a MessageReceiver object for receiving messages from Solace.
Typically, this object is
* created from the session instance.
*/
- MessageReceiver createReceiver();
+ public abstract MessageReceiver createReceiver();
+
+ /**
+ * Override this method and provide your specific properties, including all
those related to
+ * authentication, and possibly others too. The {@code}baseProperties{@code}
parameter sets the
+ * Solace VPN to "default" if none is specified.
+ *
+ * <p>You should add your properties to the parameter
{@code}baseProperties{@code}, and return the
+ * result.
+ *
+ * <p>The method will be used whenever the session needs to be created or
refreshed. If you are
+ * setting credentials with expiration, just make sure that the latest
available credentials (e.g.
+ * renewed token) are set when the method is called.
+ *
+ * <p>For a list of all the properties that can be set, please check the
following link:
+ *
+ * <ul>
+ * <li><a
+ *
href="https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html">https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html</a>
+ * </ul>
+ */
+ public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties
baseProperties);
+
+ /**
+ * This method will be called by the write connector when a new session is
started.
+ *
+ * <p>This call will happen in the worker, so you need to make sure that the
worker has access to
+ * the resources you need to set the properties.
+ *
+ * <p>The call will happen only once per session initialization. Typically,
that will be when the
+ * worker and the client are created. But if for any reason the session is
lost (e.g. expired auth
+ * token), this method will be called again.
+ */
+ public final JCSMPProperties
initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) {
+ JCSMPProperties jcsmpProperties =
initializeSessionProperties(getDefaultProperties());
+ return overrideConnectorProperties(jcsmpProperties, mode);
+ }
+
+ private static JCSMPProperties getDefaultProperties() {
+ JCSMPProperties props = new JCSMPProperties();
+ props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
+ // Outgoing messages will have a sender timestamp field populated
+ props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
+ // Make XMLProducer safe to access from several threads. This is the
default value, setting
+ // it just in case.
+ props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
+
+ return props;
+ }
+
+ /**
+ * This method overrides some properties for the broker session to prevent
misconfiguration,
+ * taking into account how the write connector works.
+ */
+ private static JCSMPProperties overrideConnectorProperties(
+ JCSMPProperties props, SolaceIO.SubmissionMode mode) {
+
+ // PUB_ACK_WINDOW_SIZE heavily affects performance when publishing
persistent
+ // messages. It can be a value between 1 and 255. This is the batch size
for the ack
+ // received from Solace. A value of 1 will have the lowest latency, but a
very low
+ // throughput and a monumental backpressure.
+
+ // This controls how the messages are sent to Solace
+ if (mode == SolaceIO.SubmissionMode.HIGHER_THROUGHPUT) {
+ // Create a parallel thread and a queue to send the messages
+
+ Boolean msgCbProp =
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
+ if (msgCbProp != null && msgCbProp) {
+ LOG.warn(
+ "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false
since"
+ + " HIGHER_THROUGHPUT mode was selected");
+ }
+
+ props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
+
+ Integer ackWindowSize =
props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
+ if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
+ LOG.warn(
+ String.format(
+ "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+ + " HIGHER_THROUGHPUT mode was selected",
+ BATCHED_PUB_ACK_WINDOW));
+ }
+ props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE,
BATCHED_PUB_ACK_WINDOW);
+ } else {
+ // Send from the same thread where the produced is being called. This
offers the lowest
+ // latency, but a low throughput too.
+ Boolean msgCbProp =
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
+ if (msgCbProp != null && !msgCbProp) {
+ LOG.warn(
+ "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true
since"
+ + " LOWER_LATENCY mode was selected");
+ }
+
+ props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+
+ Integer ackWindowSize =
props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
+ if ((ackWindowSize != null && ackWindowSize !=
STREAMING_PUB_ACK_WINDOW)) {
+ LOG.warn(
+ String.format(
+ "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+ + " LOWER_LATENCY mode was selected",
+ STREAMING_PUB_ACK_WINDOW));
+ }
+
+ props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE,
STREAMING_PUB_ACK_WINDOW);
+ }
+ return props;
+ }
}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 7d1dee7a118..027de2cff13 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -26,9 +26,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* queue property and mandates the implementation of a create() method in
concrete subclasses.
*/
public abstract class SessionServiceFactory implements Serializable {
-
/**
- * A reference to a Queue object. This is set when the pipline is
constructed (in the {@link
+ * A reference to a Queue object. This is set when the pipeline is
constructed (in the {@link
*
org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)}
method).
* This could be used to associate the created SessionService with a
specific queue for message
* handling.
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index 285c1cb8a7e..ec0ae719468 100644
---
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk.io.solace;
+import com.solacesystems.jcsmp.JCSMPProperties;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SessionService;
-public class MockEmptySessionService implements SessionService {
+public class MockEmptySessionService extends SessionService {
String exceptionMessage = "This is an empty client, use a MockSessionService
instead.";
@@ -43,4 +44,9 @@ public class MockEmptySessionService implements
SessionService {
public void connect() {
throw new UnsupportedOperationException(exceptionMessage);
}
+
+ @Override
+ public JCSMPProperties initializeSessionProperties(JCSMPProperties
baseProperties) {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
index 7b14da138c6..207cfef9c62 100644
---
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -18,23 +18,35 @@
package org.apache.beam.sdk.io.solace;
import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.JCSMPProperties;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.Nullable;
-public class MockSessionService implements SessionService {
+public class MockSessionService extends SessionService {
private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
private MessageReceiver messageReceiver = null;
private final int minMessagesReceived;
+ private final @Nullable SubmissionMode mode;
public MockSessionService(
- SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int
minMessagesReceived) {
+ SerializableFunction<Integer, BytesXMLMessage> getRecordFn,
+ int minMessagesReceived,
+ @Nullable SubmissionMode mode) {
this.getRecordFn = getRecordFn;
this.minMessagesReceived = minMessagesReceived;
+ this.mode = mode;
+ }
+
+ public MockSessionService(
+ SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int
minMessagesReceived) {
+ this(getRecordFn, minMessagesReceived, null);
}
@Override
@@ -85,4 +97,16 @@ public class MockSessionService implements SessionService {
return counter.get() >= minMessagesReceived;
}
}
+
+ @Override
+ public JCSMPProperties initializeSessionProperties(JCSMPProperties
baseProperties) {
+ // Let's override some properties that will be overriden by the connector
+ // Opposite of the mode, to test that is overriden
+ baseProperties.setProperty(
+ JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode ==
SubmissionMode.HIGHER_THROUGHPUT);
+
+ baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87);
+
+ return baseProperties;
+ }
}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java
new file mode 100644
index 00000000000..e33917641e3
--- /dev/null
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static
org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
+import static org.junit.Assert.assertEquals;
+
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.Queue;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BasicAuthWriterSessionTest {
+ private final String username = "Some Username";
+ private final String password = "Some Password";
+ private final String host = "Some Host";
+ private final String vpn = "Some non default VPN";
+ SessionService withVpn;
+ SessionService withoutVpn;
+
+ @Before
+ public void setUp() throws Exception {
+ Queue q = JCSMPFactory.onlyInstance().createQueue("test-queue");
+
+ BasicAuthJcsmpSessionServiceFactory factoryWithVpn =
+ BasicAuthJcsmpSessionServiceFactory.builder()
+ .username(username)
+ .password(password)
+ .host(host)
+ .vpnName(vpn)
+ .build();
+ factoryWithVpn.setQueue(q);
+ withVpn = factoryWithVpn.create();
+
+ BasicAuthJcsmpSessionServiceFactory factoryNoVpn =
+ BasicAuthJcsmpSessionServiceFactory.builder()
+ .username(username)
+ .password(password)
+ .host(host)
+ .build();
+ factoryNoVpn.setQueue(q);
+ withoutVpn = factoryNoVpn.create();
+ }
+
+ @Test
+ public void testAuthProperties() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
+ JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode);
+ assertEquals(username, props.getStringProperty(JCSMPProperties.USERNAME));
+ assertEquals(password, props.getStringProperty(JCSMPProperties.PASSWORD));
+ assertEquals(host, props.getStringProperty(JCSMPProperties.HOST));
+ assertEquals(
+ JCSMPProperties.AUTHENTICATION_SCHEME_BASIC,
+ props.getStringProperty(JCSMPProperties.AUTHENTICATION_SCHEME));
+ }
+
+ @Test
+ public void testVpnNames() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
+ JCSMPProperties propsWithoutVpn =
withoutVpn.initializeWriteSessionProperties(mode);
+ assertEquals(DEFAULT_VPN_NAME,
propsWithoutVpn.getStringProperty(JCSMPProperties.VPN_NAME));
+ JCSMPProperties propsWithVpn =
withVpn.initializeWriteSessionProperties(mode);
+ assertEquals(vpn,
propsWithVpn.getStringProperty(JCSMPProperties.VPN_NAME));
+ }
+
+ @Test
+ public void testOverrideWithHigherThroughput() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
+ JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode);
+
+ assertEquals(false,
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+ assertEquals(
+ Long.valueOf(255),
+
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+ }
+
+ @Test
+ public void testOverrideWithLowerLatency() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
+ JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode);
+ assertEquals(true,
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+ assertEquals(
+ Long.valueOf(50),
+
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+ }
+}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
new file mode 100644
index 00000000000..0c6f88a7c9d
--- /dev/null
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.junit.Assert.assertEquals;
+
+import com.solacesystems.jcsmp.JCSMPProperties;
+import org.apache.beam.sdk.io.solace.MockSessionService;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OverrideWriterPropertiesTest {
+ @Test
+ public void testOverrideForHigherThroughput() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
+ MockSessionService service = new MockSessionService(null, 0, mode);
+
+ // Test HIGHER_THROUGHPUT mode
+ JCSMPProperties props = service.initializeWriteSessionProperties(mode);
+ assertEquals(false,
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+ assertEquals(
+ Long.valueOf(255),
+
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+ }
+
+ @Test
+ public void testOverrideForLowerLatency() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
+ MockSessionService service = new MockSessionService(null, 0, mode);
+
+ // Test HIGHER_THROUGHPUT mode
+ JCSMPProperties props = service.initializeWriteSessionProperties(mode);
+ assertEquals(true,
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+ assertEquals(
+ Long.valueOf(50),
+
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+ }
+}