This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 04ed95d6037 JCSMP properties providers for new SolaceIO write 
connector (#31906)
04ed95d6037 is described below

commit 04ed95d603781ded7a7649500f18f34c4bbc6dfd
Author: Israel Herraiz <[email protected]>
AuthorDate: Mon Jul 22 17:19:08 2024 +0200

    JCSMP properties providers for new SolaceIO write connector (#31906)
    
    * JCSMP properties providers for new SolaceIO write connector
    
    This adds a base class and two providers for the new SolaceIO write
    connector. The basic authentication provider just uses a username and
    password.
    
    Users can write their own providers to set authentication mechanisms
    and other session properties, by extending from the base class. As an
    example, we provide a GoogleCloudSecretProvider, to show how to create
    your own. This provider can also be used "as is", but it is mainly
    meant as an example to implement your own.
    
    In upcoming PRs, I will be submitting the rest of the write
    connector. It is thousands of lines of code, so I am splitting in
    smaller PRs, to facilitate code reviews.
    
    This PR contributes to #31905
    
    * Fix CheckStyle and Spotbugs errors
    
    * Fix class dependencies declaration
    
    * Unify session and auth providers/factories for the read and write 
connectors
    
    * Fix format violations in comment
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +
 sdks/java/io/solace/build.gradle                   |   3 +
 .../org/apache/beam/sdk/io/solace/SolaceIO.java    |  10 ++
 .../broker/BasicAuthJcsmpSessionService.java       |  21 ++-
 .../BasicAuthJcsmpSessionServiceFactory.java       |   3 +-
 .../broker/GCPSecretSessionServiceFactory.java     | 169 ++++++++++++++++++
 .../beam/sdk/io/solace/broker/SessionService.java  | 196 ++++++++++++++++++++-
 .../io/solace/broker/SessionServiceFactory.java    |   3 +-
 .../sdk/io/solace/MockEmptySessionService.java     |   8 +-
 .../beam/sdk/io/solace/MockSessionService.java     |  28 ++-
 .../solace/broker/BasicAuthWriterSessionTest.java  | 106 +++++++++++
 .../broker/OverrideWriterPropertiesTest.java       |  56 ++++++
 12 files changed, 587 insertions(+), 18 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bbd23a08bcd..e603e49f842 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -758,6 +758,7 @@ class BeamModulePlugin implements Plugin<Project> {
         // [bomupgrader] the BOM version is set by 
scripts/tools/bomupgrader.py. If update manually, also update
         // libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
         google_cloud_platform_libraries_bom         : 
"com.google.cloud:libraries-bom:26.39.0",
+        google_cloud_secret_manager                 : 
"com.google.cloud:google-cloud-secretmanager", // 
google_cloud_platform_libraries_bom sets version
         google_cloud_spanner                        : 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
         google_cloud_spanner_test                   : 
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
         google_cloud_vertexai                       : 
"com.google.cloud:google-cloud-vertexai", // 
google_cloud_platform_libraries_bom sets version
@@ -858,6 +859,7 @@ class BeamModulePlugin implements Plugin<Project> {
         proto_google_cloud_firestore_v1             : 
"com.google.api.grpc:proto-google-cloud-firestore-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsub_v1                : 
"com.google.api.grpc:proto-google-cloud-pubsub-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsublite_v1            : 
"com.google.api.grpc:proto-google-cloud-pubsublite-v1", // 
google_cloud_platform_libraries_bom sets version
+        proto_google_cloud_secret_manager_v1        : 
"com.google.api.grpc:proto-google-cloud-secretmanager-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_spanner_v1               : 
"com.google.api.grpc:proto-google-cloud-spanner-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_spanner_admin_database_v1: 
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_common_protos                  : 
"com.google.api.grpc:proto-google-common-protos", // 
google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index 7a74236539f..741db51a577 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -34,6 +34,9 @@ dependencies {
     implementation library.java.joda_time
     implementation library.java.solace
     implementation library.java.google_cloud_core
+    implementation library.java.google_cloud_secret_manager
+    implementation library.java.proto_google_cloud_secret_manager_v1
+    implementation library.java.protobuf_java
     implementation library.java.vendored_guava_32_1_2_jre
     implementation project(":sdks:java:extensions:avro")
     implementation library.java.avro
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
index e6b0dd34b18..980267b3f35 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
@@ -181,6 +181,10 @@ import org.slf4j.LoggerFactory;
  *
  * }</pre>
  *
+ * <h2>Writing</h2>
+ *
+ * TBD
+ *
  * <h3>Authentication</h3>
  *
  * <p>When reading from Solace, the user must use {@link
@@ -209,6 +213,12 @@ public class SolaceIO {
       };
   private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
 
+  // Part of the new write connector, documentation to be updated in upcoming 
pull requests
+  public enum SubmissionMode {
+    HIGHER_THROUGHPUT,
+    LOWER_LATENCY
+  }
+
   /** Get a {@link Topic} object from the topic name. */
   static Topic topicFromName(String topicName) {
     return JCSMPFactory.onlyInstance().createTopic(topicName);
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
index 7863dbd129c..df814b5c2be 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
@@ -39,7 +39,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
  * <p>This class provides a way to connect to a Solace broker and receive 
messages from a queue. The
  * connection is established using basic authentication.
  */
-public class BasicAuthJcsmpSessionService implements SessionService {
+public class BasicAuthJcsmpSessionService extends SessionService {
   private final String queueName;
   private final String host;
   private final String username;
@@ -137,12 +137,19 @@ public class BasicAuthJcsmpSessionService implements 
SessionService {
   }
 
   private JCSMPSession createSessionObject() throws InvalidPropertiesException 
{
-    JCSMPProperties properties = new JCSMPProperties();
-    properties.setProperty(JCSMPProperties.HOST, host);
-    properties.setProperty(JCSMPProperties.USERNAME, username);
-    properties.setProperty(JCSMPProperties.PASSWORD, password);
-    properties.setProperty(JCSMPProperties.VPN_NAME, vpnName);
-
+    JCSMPProperties properties = initializeSessionProperties(new 
JCSMPProperties());
     return JCSMPFactory.onlyInstance().createSession(properties);
   }
+
+  @Override
+  public JCSMPProperties initializeSessionProperties(JCSMPProperties 
baseProps) {
+    baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName);
+
+    baseProps.setProperty(
+        JCSMPProperties.AUTHENTICATION_SCHEME, 
JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
+    baseProps.setProperty(JCSMPProperties.USERNAME, username);
+    baseProps.setProperty(JCSMPProperties.PASSWORD, password);
+    baseProps.setProperty(JCSMPProperties.HOST, host);
+    return baseProps;
+  }
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
index 8cb4ff0af05..2084e61b7e3 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.solace.broker;
 
+import static 
org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
 import com.google.auto.value.AutoValue;
@@ -39,7 +40,7 @@ public abstract class BasicAuthJcsmpSessionServiceFactory 
extends SessionService
   public abstract String vpnName();
 
   public static Builder builder() {
-    return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder();
+    return new 
AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder().vpnName(DEFAULT_VPN_NAME);
   }
 
   @AutoValue.Builder
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
new file mode 100644
index 00000000000..dd87e1d75fa
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static 
org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretVersionName;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a {@link SessionServiceFactory} that retrieve the 
basic authentication
+ * credentials from a Google Cloud Secret Manager secret.
+ *
+ * <p>It can be used to avoid having to pass the password as an option of your 
pipeline. For this
+ * provider to work, the worker where the job runs needs to have the necessary 
credentials to access
+ * the secret. In Dataflow, this implies adding the necessary permissions to 
the worker service
+ * account. For other runners, set the credentials in the pipeline options 
using {@link
+ * org.apache.beam.sdk.extensions.gcp.options.GcpOptions}.
+ *
+ * <p>It also shows how to implement a {@link SessionServiceFactory} that 
depends on using external
+ * resources to retrieve the Solace session properties. In this case, using 
the Google Cloud Secrete
+ * Manager client.
+ *
+ * <p>Example of how to create the provider object:
+ *
+ * <pre>{@code
+ * GCPSecretSessionServiceFactory factory =
+ *     GCPSecretSessionServiceFactory.builder()
+ *         .username("user")
+ *         .host("host:port")
+ *         .passwordSecretName("secret-name")
+ *         .build();
+ *
+ * SessionService serviceUsingGCPSecret = factory.create();
+ * }</pre>
+ */
+@AutoValue
+public abstract class GCPSecretSessionServiceFactory extends 
SessionServiceFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GCPSecretSessionServiceFactory.class);
+
+  private static final String PROJECT_NOT_FOUND = "PROJECT-NOT-FOUND";
+
+  public abstract String username();
+
+  public abstract String host();
+
+  public abstract String passwordSecretName();
+
+  public abstract String vpnName();
+
+  public abstract @Nullable String secretManagerProjectId();
+
+  public abstract String passwordSecretVersion();
+
+  public static GCPSecretSessionServiceFactory.Builder builder() {
+    return new AutoValue_GCPSecretSessionServiceFactory.Builder()
+        .passwordSecretVersion("latest")
+        .vpnName(DEFAULT_VPN_NAME);
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    /** Username to be used to authenticate with the broker. */
+    public abstract GCPSecretSessionServiceFactory.Builder username(String 
username);
+
+    /**
+     * The location of the broker, including port details if it is not 
listening in the default
+     * port.
+     */
+    public abstract GCPSecretSessionServiceFactory.Builder host(String host);
+
+    /** The Secret Manager secret name where the password is stored. */
+    public abstract GCPSecretSessionServiceFactory.Builder 
passwordSecretName(String name);
+
+    /** Optional. Solace broker VPN name. If not set, "default" is used. */
+    public abstract GCPSecretSessionServiceFactory.Builder vpnName(String 
name);
+
+    /**
+     * Optional for Dataflow or VMs running on Google Cloud. The project id of 
the project where the
+     * secret is stored. If not set, the project id where the job is running 
is used.
+     */
+    public abstract GCPSecretSessionServiceFactory.Builder 
secretManagerProjectId(String id);
+
+    /** Optional. Solace broker password secret version. If not set, "latest" 
is used. */
+    public abstract GCPSecretSessionServiceFactory.Builder 
passwordSecretVersion(String version);
+
+    public abstract GCPSecretSessionServiceFactory build();
+  }
+
+  @Override
+  public SessionService create() {
+    String password = null;
+    try {
+      password = retrieveSecret();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    BasicAuthJcsmpSessionServiceFactory factory =
+        BasicAuthJcsmpSessionServiceFactory.builder()
+            .username(username())
+            .host(host())
+            .password(password)
+            .vpnName(vpnName())
+            .build();
+
+    return factory.create();
+  }
+
+  private String retrieveSecret() throws IOException {
+    try (SecretManagerServiceClient client = 
SecretManagerServiceClient.create()) {
+      String projectId =
+          
Optional.ofNullable(secretManagerProjectId()).orElse(getProjectIdFromVmMetadata());
+      SecretVersionName secretVersionName =
+          SecretVersionName.of(projectId, passwordSecretName(), 
passwordSecretVersion());
+      return 
client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String getProjectIdFromVmMetadata() throws IOException {
+    URL metadataUrl =
+        new 
URL("http://metadata.google.internal/computeMetadata/v1/project/project-id";);
+    HttpURLConnection connection = (HttpURLConnection) 
metadataUrl.openConnection();
+    connection.setRequestProperty("Metadata-Flavor", "Google");
+
+    String output;
+    try (BufferedReader reader =
+        new BufferedReader(
+            new InputStreamReader(connection.getInputStream(), 
StandardCharsets.UTF_8))) {
+      output = reader.readLine();
+    }
+
+    if (output == null || output.isEmpty()) {
+      LOG.error(
+          "Cannot retrieve project id from VM metadata, please set a project 
id in your GoogleCloudSecretProvider.");
+    }
+    return output != null ? output : PROJECT_NOT_FOUND;
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
index cd368865f0c..aed700a71de 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
@@ -17,34 +17,220 @@
  */
 package org.apache.beam.sdk.io.solace.broker;
 
+import com.solacesystems.jcsmp.JCSMPProperties;
 import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The SessionService interface provides a set of methods for managing a 
session with the Solace
  * messaging system. It allows for establishing a connection, creating a 
message-receiver object,
  * checking if the connection is closed or not, and gracefully closing the 
session.
+ *
+ * <p>Override this class and the method {@link 
#initializeSessionProperties(JCSMPProperties)} with
+ * your specific properties, including all those related to authentication.
+ *
+ * <p>The connector will call the method only once per session created, so you 
can perform
+ * relatively heavy operations in that method (e.g. connect to a store or 
vault to retrieve
+ * credentials).
+ *
+ * <p>There are some default properties that are set by default and can be 
overridden in this
+ * provider, that are relevant for the writer connector, and not used in the 
case of the read
+ * connector (since they are not necessary for reading):
+ *
+ * <ul>
+ *   <li>VPN_NAME: default
+ *   <li>GENERATE_SEND_TIMESTAMPS: true
+ *   <li>PUB_MULTI_THREAD: true
+ * </ul>
+ *
+ * <p>The connector overrides other properties, regardless of what this 
provider sends to the
+ * connector. Those properties are the following. Again, these properties are 
only relevant for the
+ * write connector.
+ *
+ * <ul>
+ *   <li>PUB_ACK_WINDOW_SIZE
+ *   <li>MESSAGE_CALLBACK_ON_REACTOR
+ * </ul>
+ *
+ * Those properties are set by the connector based on the values of {@link
+ * 
org.apache.beam.sdk.io.solace.SolaceIO.Write#withWriterType(SolaceIO.WriterType)}
 and {@link
+ * 
org.apache.beam.sdk.io.solace.SolaceIO.Write#withSubmissionMode(SolaceIO.SubmissionMode)}.
+ *
+ * <p>The method will always run in a worker thread or task, and not in the 
driver program. If you
+ * need to access any resource to set the properties, you need to make sure 
that the worker has the
+ * network connectivity required for that, and that any credential or 
configuration is passed to the
+ * provider through the constructor.
+ *
+ * <p>The connector ensures that no two threads will be calling that method at 
the same time, so you
+ * don't have to take any specific precautions to avoid race conditions.
+ *
+ * <p>For basic authentication, use {@link BasicAuthJcsmpSessionService} and 
{@link
+ * BasicAuthJcsmpSessionServiceFactory}.
+ *
+ * <p>For other situations, you need to extend this class. For instance:
+ *
+ * <pre>{@code
+ * public class MySessionService extends SessionService {
+ *   private final String authToken;
+ *
+ *   public MySessionService(String token) {
+ *    this.oauthToken = token;
+ *    ...
+ *   }
+ *
+ *   {@literal }@Override
+ *   public JCSMPProperties initializeSessionProperties(JCSMPProperties 
baseProps) {
+ *     baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME, 
JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
+ *     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken);
+ *     return props;
+ *   }
+ *
+ *   {@literal }@Override
+ *   public void connect() {
+ *       ...
+ *   }
+ *
+ *   ...
+ * }
+ * }</pre>
  */
-public interface SessionService extends Serializable {
+public abstract class SessionService implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SessionService.class);
+
+  public static final String DEFAULT_VPN_NAME = "default";
+
+  private static final int STREAMING_PUB_ACK_WINDOW = 50;
+  private static final int BATCHED_PUB_ACK_WINDOW = 255;
 
   /**
    * Establishes a connection to the service. This could involve providing 
connection details like
    * host, port, VPN name, username, and password.
    */
-  void connect();
+  public abstract void connect();
 
   /** Gracefully closes the connection to the service. */
-  void close();
+  public abstract void close();
 
   /**
    * Checks whether the connection to the service is currently closed. This 
method is called when an
    * `UnboundedSolaceReader` is starting to read messages - a session will be 
created if this
    * returns true.
    */
-  boolean isClosed();
+  public abstract boolean isClosed();
 
   /**
    * Creates a MessageReceiver object for receiving messages from Solace. 
Typically, this object is
    * created from the session instance.
    */
-  MessageReceiver createReceiver();
+  public abstract MessageReceiver createReceiver();
+
+  /**
+   * Override this method and provide your specific properties, including all 
those related to
+   * authentication, and possibly others too. The {@code}baseProperties{@code} 
parameter sets the
+   * Solace VPN to "default" if none is specified.
+   *
+   * <p>You should add your properties to the parameter 
{@code}baseProperties{@code}, and return the
+   * result.
+   *
+   * <p>The method will be used whenever the session needs to be created or 
refreshed. If you are
+   * setting credentials with expiration, just make sure that the latest 
available credentials (e.g.
+   * renewed token) are set when the method is called.
+   *
+   * <p>For a list of all the properties that can be set, please check the 
following link:
+   *
+   * <ul>
+   *   <li><a
+   *       
href="https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html";>https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html</a>
+   * </ul>
+   */
+  public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties 
baseProperties);
+
+  /**
+   * This method will be called by the write connector when a new session is 
started.
+   *
+   * <p>This call will happen in the worker, so you need to make sure that the 
worker has access to
+   * the resources you need to set the properties.
+   *
+   * <p>The call will happen only once per session initialization. Typically, 
that will be when the
+   * worker and the client are created. But if for any reason the session is 
lost (e.g. expired auth
+   * token), this method will be called again.
+   */
+  public final JCSMPProperties 
initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) {
+    JCSMPProperties jcsmpProperties = 
initializeSessionProperties(getDefaultProperties());
+    return overrideConnectorProperties(jcsmpProperties, mode);
+  }
+
+  private static JCSMPProperties getDefaultProperties() {
+    JCSMPProperties props = new JCSMPProperties();
+    props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
+    // Outgoing messages will have a sender timestamp field populated
+    props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
+    // Make XMLProducer safe to access from several threads. This is the 
default value, setting
+    // it just in case.
+    props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
+
+    return props;
+  }
+
+  /**
+   * This method overrides some properties for the broker session to prevent 
misconfiguration,
+   * taking into account how the write connector works.
+   */
+  private static JCSMPProperties overrideConnectorProperties(
+      JCSMPProperties props, SolaceIO.SubmissionMode mode) {
+
+    // PUB_ACK_WINDOW_SIZE heavily affects performance when publishing 
persistent
+    // messages. It can be a value between 1 and 255. This is the batch size 
for the ack
+    // received from Solace. A value of 1 will have the lowest latency, but a 
very low
+    // throughput and a monumental backpressure.
+
+    // This controls how the messages are sent to Solace
+    if (mode == SolaceIO.SubmissionMode.HIGHER_THROUGHPUT) {
+      // Create a parallel thread and a queue to send the messages
+
+      Boolean msgCbProp = 
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
+      if (msgCbProp != null && msgCbProp) {
+        LOG.warn(
+            "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false 
since"
+                + " HIGHER_THROUGHPUT mode was selected");
+      }
+
+      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
+
+      Integer ackWindowSize = 
props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
+      if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
+        LOG.warn(
+            String.format(
+                "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+                    + " HIGHER_THROUGHPUT mode was selected",
+                BATCHED_PUB_ACK_WINDOW));
+      }
+      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 
BATCHED_PUB_ACK_WINDOW);
+    } else {
+      // Send from the same thread where the produced is being called. This 
offers the lowest
+      // latency, but a low throughput too.
+      Boolean msgCbProp = 
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
+      if (msgCbProp != null && !msgCbProp) {
+        LOG.warn(
+            "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true 
since"
+                + " LOWER_LATENCY mode was selected");
+      }
+
+      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+
+      Integer ackWindowSize = 
props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
+      if ((ackWindowSize != null && ackWindowSize != 
STREAMING_PUB_ACK_WINDOW)) {
+        LOG.warn(
+            String.format(
+                "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+                    + " LOWER_LATENCY mode was selected",
+                STREAMING_PUB_ACK_WINDOW));
+      }
+
+      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 
STREAMING_PUB_ACK_WINDOW);
+    }
+    return props;
+  }
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 7d1dee7a118..027de2cff13 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -26,9 +26,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * queue property and mandates the implementation of a create() method in 
concrete subclasses.
  */
 public abstract class SessionServiceFactory implements Serializable {
-
   /**
-   * A reference to a Queue object. This is set when the pipline is 
constructed (in the {@link
+   * A reference to a Queue object. This is set when the pipeline is 
constructed (in the {@link
    * 
org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)}
 method).
    * This could be used to associate the created SessionService with a 
specific queue for message
    * handling.
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index 285c1cb8a7e..ec0ae719468 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.sdk.io.solace;
 
+import com.solacesystems.jcsmp.JCSMPProperties;
 import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
 import org.apache.beam.sdk.io.solace.broker.SessionService;
 
-public class MockEmptySessionService implements SessionService {
+public class MockEmptySessionService extends SessionService {
 
   String exceptionMessage = "This is an empty client, use a MockSessionService 
instead.";
 
@@ -43,4 +44,9 @@ public class MockEmptySessionService implements 
SessionService {
   public void connect() {
     throw new UnsupportedOperationException(exceptionMessage);
   }
+
+  @Override
+  public JCSMPProperties initializeSessionProperties(JCSMPProperties 
baseProperties) {
+    throw new UnsupportedOperationException(exceptionMessage);
+  }
 }
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
index 7b14da138c6..207cfef9c62 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -18,23 +18,35 @@
 package org.apache.beam.sdk.io.solace;
 
 import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.JCSMPProperties;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
 import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
 import org.apache.beam.sdk.io.solace.broker.SessionService;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
-public class MockSessionService implements SessionService {
+public class MockSessionService extends SessionService {
 
   private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
   private MessageReceiver messageReceiver = null;
   private final int minMessagesReceived;
+  private final @Nullable SubmissionMode mode;
 
   public MockSessionService(
-      SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int 
minMessagesReceived) {
+      SerializableFunction<Integer, BytesXMLMessage> getRecordFn,
+      int minMessagesReceived,
+      @Nullable SubmissionMode mode) {
     this.getRecordFn = getRecordFn;
     this.minMessagesReceived = minMessagesReceived;
+    this.mode = mode;
+  }
+
+  public MockSessionService(
+      SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int 
minMessagesReceived) {
+    this(getRecordFn, minMessagesReceived, null);
   }
 
   @Override
@@ -85,4 +97,16 @@ public class MockSessionService implements SessionService {
       return counter.get() >= minMessagesReceived;
     }
   }
+
+  @Override
+  public JCSMPProperties initializeSessionProperties(JCSMPProperties 
baseProperties) {
+    // Let's override some properties that will be overriden by the connector
+    // Opposite of the mode, to test that is overriden
+    baseProperties.setProperty(
+        JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode == 
SubmissionMode.HIGHER_THROUGHPUT);
+
+    baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87);
+
+    return baseProperties;
+  }
 }
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java
new file mode 100644
index 00000000000..e33917641e3
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static 
org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
+import static org.junit.Assert.assertEquals;
+
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.Queue;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BasicAuthWriterSessionTest {
+  private final String username = "Some Username";
+  private final String password = "Some Password";
+  private final String host = "Some Host";
+  private final String vpn = "Some non default VPN";
+  SessionService withVpn;
+  SessionService withoutVpn;
+
+  @Before
+  public void setUp() throws Exception {
+    Queue q = JCSMPFactory.onlyInstance().createQueue("test-queue");
+
+    BasicAuthJcsmpSessionServiceFactory factoryWithVpn =
+        BasicAuthJcsmpSessionServiceFactory.builder()
+            .username(username)
+            .password(password)
+            .host(host)
+            .vpnName(vpn)
+            .build();
+    factoryWithVpn.setQueue(q);
+    withVpn = factoryWithVpn.create();
+
+    BasicAuthJcsmpSessionServiceFactory factoryNoVpn =
+        BasicAuthJcsmpSessionServiceFactory.builder()
+            .username(username)
+            .password(password)
+            .host(host)
+            .build();
+    factoryNoVpn.setQueue(q);
+    withoutVpn = factoryNoVpn.create();
+  }
+
+  @Test
+  public void testAuthProperties() {
+    SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
+    JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode);
+    assertEquals(username, props.getStringProperty(JCSMPProperties.USERNAME));
+    assertEquals(password, props.getStringProperty(JCSMPProperties.PASSWORD));
+    assertEquals(host, props.getStringProperty(JCSMPProperties.HOST));
+    assertEquals(
+        JCSMPProperties.AUTHENTICATION_SCHEME_BASIC,
+        props.getStringProperty(JCSMPProperties.AUTHENTICATION_SCHEME));
+  }
+
+  @Test
+  public void testVpnNames() {
+    SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
+    JCSMPProperties propsWithoutVpn = 
withoutVpn.initializeWriteSessionProperties(mode);
+    assertEquals(DEFAULT_VPN_NAME, 
propsWithoutVpn.getStringProperty(JCSMPProperties.VPN_NAME));
+    JCSMPProperties propsWithVpn = 
withVpn.initializeWriteSessionProperties(mode);
+    assertEquals(vpn, 
propsWithVpn.getStringProperty(JCSMPProperties.VPN_NAME));
+  }
+
+  @Test
+  public void testOverrideWithHigherThroughput() {
+    SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
+    JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode);
+
+    assertEquals(false, 
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+    assertEquals(
+        Long.valueOf(255),
+        
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+  }
+
+  @Test
+  public void testOverrideWithLowerLatency() {
+    SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
+    JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode);
+    assertEquals(true, 
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+    assertEquals(
+        Long.valueOf(50),
+        
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+  }
+}
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
new file mode 100644
index 00000000000..0c6f88a7c9d
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.junit.Assert.assertEquals;
+
+import com.solacesystems.jcsmp.JCSMPProperties;
+import org.apache.beam.sdk.io.solace.MockSessionService;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OverrideWriterPropertiesTest {
+  @Test
+  public void testOverrideForHigherThroughput() {
+    SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
+    MockSessionService service = new MockSessionService(null, 0, mode);
+
+    // Test HIGHER_THROUGHPUT mode
+    JCSMPProperties props = service.initializeWriteSessionProperties(mode);
+    assertEquals(false, 
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+    assertEquals(
+        Long.valueOf(255),
+        
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+  }
+
+  @Test
+  public void testOverrideForLowerLatency() {
+    SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
+    MockSessionService service = new MockSessionService(null, 0, mode);
+
+    // Test HIGHER_THROUGHPUT mode
+    JCSMPProperties props = service.initializeWriteSessionProperties(mode);
+    assertEquals(true, 
props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+    assertEquals(
+        Long.valueOf(50),
+        
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+  }
+}


Reply via email to