This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3b8ddda10a0 Solace Read connector: adding Basic Authentication support
(#31541)
3b8ddda10a0 is described below
commit 3b8ddda10a01ff640ed4cf1ce746d0c19e003180
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Mon Jul 1 23:24:57 2024 +0200
Solace Read connector: adding Basic Authentication support (#31541)
* Add support for BasicAuth to Solace
* Address PR comments
* Use `checkStateNotNull`
---
.../broker/BasicAuthJcsmpSessionService.java | 148 +++++++++++++++++++++
.../BasicAuthJcsmpSessionServiceFactory.java | 74 +++++++++++
.../io/solace/broker/SolaceMessageReceiver.java | 72 ++++++++++
3 files changed, 294 insertions(+)
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
new file mode 100644
index 00000000000..7863dbd129c
--- /dev/null
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.solacesystems.jcsmp.ConsumerFlowProperties;
+import com.solacesystems.jcsmp.EndpointProperties;
+import com.solacesystems.jcsmp.FlowReceiver;
+import com.solacesystems.jcsmp.InvalidPropertiesException;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.JCSMPSession;
+import com.solacesystems.jcsmp.Queue;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * A class that manages a connection to a Solace broker using basic
authentication.
+ *
+ * <p>This class provides a way to connect to a Solace broker and receive
messages from a queue. The
+ * connection is established using basic authentication.
+ */
+public class BasicAuthJcsmpSessionService implements SessionService {
+ private final String queueName;
+ private final String host;
+ private final String username;
+ private final String password;
+ private final String vpnName;
+ @Nullable private JCSMPSession jcsmpSession;
+ private final RetryCallableManager retryCallableManager =
RetryCallableManager.create();
+
+ /**
+ * Creates a new {@link BasicAuthJcsmpSessionService} with the given
parameters.
+ *
+ * @param queueName The name of the queue to receive messages from.
+ * @param host The host name or IP address of the Solace broker. Format:
Host[:Port]
+ * @param username The username to use for authentication.
+ * @param password The password to use for authentication.
+ * @param vpnName The name of the VPN to connect to.
+ */
+ public BasicAuthJcsmpSessionService(
+ String queueName, String host, String username, String password, String
vpnName) {
+ this.queueName = queueName;
+ this.host = host;
+ this.username = username;
+ this.password = password;
+ this.vpnName = vpnName;
+ }
+
+ @Override
+ public void connect() {
+ retryCallableManager.retryCallable(this::connectSession,
ImmutableSet.of(JCSMPException.class));
+ }
+
+ @Override
+ public void close() {
+ if (isClosed()) {
+ return;
+ }
+ retryCallableManager.retryCallable(
+ () -> {
+ checkStateNotNull(jcsmpSession).closeSession();
+ return 0;
+ },
+ ImmutableSet.of(IOException.class));
+ }
+
+ @Override
+ public MessageReceiver createReceiver() {
+ return retryCallableManager.retryCallable(
+ this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
+ }
+
+ @Override
+ public boolean isClosed() {
+ return jcsmpSession == null || jcsmpSession.isClosed();
+ }
+
+ private MessageReceiver createFlowReceiver() throws JCSMPException,
IOException {
+ if (isClosed()) {
+ connectSession();
+ }
+
+ Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);
+
+ ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
+ flowProperties.setEndpoint(queue);
+ flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
+
+ EndpointProperties endpointProperties = new EndpointProperties();
+
endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);
+ if (jcsmpSession != null) {
+ return new SolaceMessageReceiver(
+ createFlowReceiver(jcsmpSession, flowProperties,
endpointProperties));
+ }
+ throw new IOException(
+ "SolaceIO.Read: Could not create a receiver from the Jcsmp session:
session object is null.");
+ }
+
+ // The `@SuppressWarning` is needed here, because the checkerframework
reports an error for the
+ // first argument of the `createFlow` being null, even though the
documentation allows it:
+ //
https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPSession.html#createFlow-com.solacesystems.jcsmp.XMLMessageListener-com.solacesystems.jcsmp.ConsumerFlowProperties-com.solacesystems.jcsmp.EndpointProperties-
+ @SuppressWarnings("nullness")
+ private static FlowReceiver createFlowReceiver(
+ JCSMPSession jcsmpSession,
+ ConsumerFlowProperties flowProperties,
+ EndpointProperties endpointProperties)
+ throws JCSMPException {
+ return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
+ }
+
+ private int connectSession() throws JCSMPException {
+ if (jcsmpSession == null) {
+ jcsmpSession = createSessionObject();
+ }
+ jcsmpSession.connect();
+ return 0;
+ }
+
+ private JCSMPSession createSessionObject() throws InvalidPropertiesException
{
+ JCSMPProperties properties = new JCSMPProperties();
+ properties.setProperty(JCSMPProperties.HOST, host);
+ properties.setProperty(JCSMPProperties.USERNAME, username);
+ properties.setProperty(JCSMPProperties.PASSWORD, password);
+ properties.setProperty(JCSMPProperties.VPN_NAME, vpnName);
+
+ return JCSMPFactory.onlyInstance().createSession(properties);
+ }
+}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
new file mode 100644
index 00000000000..8cb4ff0af05
--- /dev/null
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A factory for creating {@link BasicAuthJcsmpSessionService} instances.
Extends {@link
+ * SessionServiceFactory}.
+ *
+ * <p>This factory provides a way to create {@link
BasicAuthJcsmpSessionService} instances with
+ * authenticate to Solace with Basic Authentication.
+ */
+@AutoValue
+public abstract class BasicAuthJcsmpSessionServiceFactory extends
SessionServiceFactory {
+ public abstract String host();
+
+ public abstract String username();
+
+ public abstract String password();
+
+ public abstract String vpnName();
+
+ public static Builder builder() {
+ return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /**
+ * Set Solace host, format: Host[:Port] e.g. "12.34.56.78", or
"[fe80::1]", or
+ * "12.34.56.78:4444".
+ */
+ public abstract Builder host(String host);
+
+ /** Set Solace username. */
+ public abstract Builder username(String username);
+ /** Set Solace password. */
+ public abstract Builder password(String password);
+
+ /** Set Solace vpn name. */
+ public abstract Builder vpnName(String vpnName);
+
+ public abstract BasicAuthJcsmpSessionServiceFactory build();
+ }
+
+ @Override
+ public SessionService create() {
+ return new BasicAuthJcsmpSessionService(
+ checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
+ host(),
+ username(),
+ password(),
+ vpnName());
+ }
+}
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
new file mode 100644
index 00000000000..e5f129d3ddf
--- /dev/null
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.FlowReceiver;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.StaleSessionException;
+import java.io.IOException;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolaceMessageReceiver implements MessageReceiver {
+ private static final Logger LOG =
LoggerFactory.getLogger(SolaceMessageReceiver.class);
+
+ public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
+ private final FlowReceiver flowReceiver;
+ private final RetryCallableManager retryCallableManager =
RetryCallableManager.create();
+
+ public SolaceMessageReceiver(FlowReceiver flowReceiver) {
+ this.flowReceiver = flowReceiver;
+ }
+
+ @Override
+ public void start() {
+ startFlowReceiver();
+ }
+
+ private void startFlowReceiver() {
+ retryCallableManager.retryCallable(
+ () -> {
+ flowReceiver.start();
+ return 0;
+ },
+ ImmutableSet.of(JCSMPException.class));
+ }
+
+ @Override
+ public boolean isClosed() {
+ return flowReceiver == null || flowReceiver.isClosed();
+ }
+
+ @Override
+ public BytesXMLMessage receive() throws IOException {
+ try {
+ return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
+ } catch (StaleSessionException e) {
+ LOG.warn("SolaceIO: Caught StaleSessionException, restarting the
FlowReceiver.");
+ startFlowReceiver();
+ throw new IOException(e);
+ } catch (JCSMPException e) {
+ throw new IOException(e);
+ }
+ }
+}