This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a88f1583f0 Solace Read connector: adding implementations of 
SempClient and SempClientFactory (#31542)
8a88f1583f0 is described below

commit 8a88f1583f0d189d3f58c39db4b40f9bbc789684
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Mon Jul 8 20:02:04 2024 +0200

    Solace Read connector: adding implementations of SempClient and 
SempClientFactory (#31542)
    
    * Adding implementations of SempClient and SempClientFactory
    
    * Use core SerializableSupplier, remove SuppressWarnings, extract number to 
a variable, cookie store map
---
 sdks/java/io/solace/build.gradle                   |   4 +
 .../sdk/io/solace/broker/BasicAuthSempClient.java  | 102 ++++++++++
 .../solace/broker/BasicAuthSempClientFactory.java  |  82 ++++++++
 .../beam/sdk/io/solace/broker/BrokerResponse.java  |  62 ++++++
 .../solace/broker/SempBasicAuthClientExecutor.java | 217 +++++++++++++++++++++
 .../org/apache/beam/sdk/io/solace/data/Semp.java   |  74 +++++++
 .../broker/SempBasicAuthClientExecutorTest.java    | 202 +++++++++++++++++++
 7 files changed, 743 insertions(+)

diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index 7c643dc9127..2d720ab7d92 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -43,6 +43,10 @@ dependencies {
     implementation library.java.google_api_common
     implementation library.java.gax
     implementation library.java.threetenbp
+    implementation library.java.google_http_client
+    implementation library.java.google_http_client_gson
+    implementation library.java.jackson_core
+    implementation library.java.jackson_databind
 
     testImplementation library.java.junit
     testImplementation project(path: ":sdks:java:io:common")
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java
new file mode 100644
index 00000000000..4884bb61e62
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.http.HttpRequestFactory;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import java.io.IOException;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Semp.Queue;
+import org.apache.beam.sdk.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class that manages REST calls to the Solace Element Management Protocol 
(SEMP) using basic
+ * authentication.
+ *
+ * <p>This class provides methods to check necessary information, such as if 
the queue is
+ * non-exclusive, remaining backlog bytes of a queue. It can also create and 
execute calls to create
+ * queue for a topic.
+ */
+@Internal
+public class BasicAuthSempClient implements SempClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BasicAuthSempClient.class);
+  private final ObjectMapper objectMapper =
+      new 
ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+  private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor;
+
+  public BasicAuthSempClient(
+      String host,
+      String username,
+      String password,
+      String vpnName,
+      SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier) {
+    sempBasicAuthClientExecutor =
+        new SempBasicAuthClientExecutor(
+            host, username, password, vpnName, 
httpRequestFactorySupplier.get());
+  }
+
+  @Override
+  public boolean isQueueNonExclusive(String queueName) throws IOException {
+    LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is 
nonExclusive", queueName);
+    BrokerResponse response = 
sempBasicAuthClientExecutor.getQueueResponse(queueName);
+    if (response.content == null) {
+      throw new IOException("SolaceIO: response from SEMP is empty!");
+    }
+    Queue q = mapJsonToClass(response.content, Queue.class);
+    return q.data().accessType().equals("non-exclusive");
+  }
+
+  @Override
+  public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, 
String topicName)
+      throws IOException {
+    createQueue(queueName);
+    createSubscription(queueName, topicName);
+    return JCSMPFactory.onlyInstance().createQueue(queueName);
+  }
+
+  @Override
+  public long getBacklogBytes(String queueName) throws IOException {
+    BrokerResponse response = 
sempBasicAuthClientExecutor.getQueueResponse(queueName);
+    if (response.content == null) {
+      throw new IOException("SolaceIO: response from SEMP is empty!");
+    }
+    Queue q = mapJsonToClass(response.content, Queue.class);
+    return q.data().msgSpoolUsage();
+  }
+
+  private void createQueue(String queueName) throws IOException {
+    LOG.info("SolaceIO.Read: Creating new queue {}.", queueName);
+    sempBasicAuthClientExecutor.createQueueResponse(queueName);
+  }
+
+  private void createSubscription(String queueName, String topicName) throws 
IOException {
+    LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", 
queueName, topicName);
+    sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, 
topicName);
+  }
+
+  private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
+      throws JsonProcessingException {
+    return objectMapper.readValue(content, mapSuccessToClass);
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
new file mode 100644
index 00000000000..4c01257373b
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.util.SerializableSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A factory for creating {@link BasicAuthSempClient} instances.
+ *
+ * <p>This factory provides a way to create {@link BasicAuthSempClient} 
instances with different
+ * configurations.
+ */
+@AutoValue
+public abstract class BasicAuthSempClientFactory implements SempClientFactory {
+
+  abstract String host();
+
+  abstract String username();
+
+  abstract String password();
+
+  abstract String vpnName();
+
+  abstract @Nullable SerializableSupplier<HttpRequestFactory> 
httpRequestFactorySupplier();
+
+  public static Builder builder() {
+    return new AutoValue_BasicAuthSempClientFactory.Builder();
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    /** Set Solace SEMP host, format: [Protocol://]Host[:Port]. e.g. 
"http://127.0.0.1:8080"; */
+    public abstract Builder host(String host);
+
+    /** Set Solace username. */
+    public abstract Builder username(String username);
+    /** Set Solace password. */
+    public abstract Builder password(String password);
+
+    /** Set Solace vpn name. */
+    public abstract Builder vpnName(String vpnName);
+
+    @VisibleForTesting
+    abstract Builder httpRequestFactorySupplier(
+        SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier);
+
+    public abstract BasicAuthSempClientFactory build();
+  }
+
+  @Override
+  public SempClient create() {
+    return new BasicAuthSempClient(
+        host(), username(), password(), vpnName(), 
getHttpRequestFactorySupplier());
+  }
+
+  SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() {
+    SerializableSupplier<HttpRequestFactory> httpRequestSupplier = 
httpRequestFactorySupplier();
+    return httpRequestSupplier != null
+        ? httpRequestSupplier
+        : () -> new NetHttpTransport().createRequestFactory();
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java
new file mode 100644
index 00000000000..1a47f801228
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.google.api.client.http.HttpResponse;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Collectors;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class BrokerResponse {
+  final int code;
+  final String message;
+  @Nullable String content;
+
+  public BrokerResponse(int responseCode, String message, @Nullable 
InputStream content) {
+    this.code = responseCode;
+    this.message = message;
+    if (content != null) {
+      this.content =
+          new BufferedReader(new InputStreamReader(content, 
StandardCharsets.UTF_8))
+              .lines()
+              .collect(Collectors.joining("\n"));
+    }
+  }
+
+  public static BrokerResponse fromHttpResponse(HttpResponse response) throws 
IOException {
+    return new BrokerResponse(
+        response.getStatusCode(), response.getStatusMessage(), 
response.getContent());
+  }
+
+  @Override
+  public String toString() {
+    return "BrokerResponse{"
+        + "code="
+        + code
+        + ", message='"
+        + message
+        + '\''
+        + ", content="
+        + content
+        + '}';
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java
new file mode 100644
index 00000000000..62a492775e7
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.json.JsonHttpContent;
+import com.google.api.client.json.gson.GsonFactory;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.CookieManager;
+import java.net.HttpCookie;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A class to execute requests to SEMP v2 with Basic Auth authentication.
+ *
+ * <p>This approach takes advantage of <a
+ * href="https://docs.solace.com/Admin/SEMP/SEMP-Security.htm#Sessions";>SEMP 
Sessions</a>. The
+ * session is established when a user authenticates with HTTP Basic 
authentication. When the
+ * response is 401 Unauthorized, the client will execute an additional request 
with Basic Auth
+ * header to refresh the token.
+ */
+class SempBasicAuthClientExecutor implements Serializable {
+  // Every request will be repeated 2 times in case of abnormal connection 
failures.
+  private static final int REQUEST_NUM_RETRIES = 2;
+  private static final Map<CookieManagerKey, CookieManager> COOKIE_MANAGER_MAP 
=
+      new ConcurrentHashMap<CookieManagerKey, CookieManager>();
+  private static final String COOKIES_HEADER = "Set-Cookie";
+
+  private final String username;
+  private final String messageVpn;
+  private final String baseUrl;
+  private final String password;
+  private final CookieManagerKey cookieManagerKey;
+  private final transient HttpRequestFactory requestFactory;
+
+  SempBasicAuthClientExecutor(
+      String host,
+      String username,
+      String password,
+      String vpnName,
+      HttpRequestFactory httpRequestFactory) {
+    this.baseUrl = String.format("%s/SEMP/v2", host);
+    this.username = username;
+    this.messageVpn = vpnName;
+    this.password = password;
+    this.requestFactory = httpRequestFactory;
+    this.cookieManagerKey = new CookieManagerKey(this.baseUrl, this.username);
+    COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager());
+  }
+
+  private static String getQueueEndpoint(String messageVpn, String queueName) {
+    return String.format("/monitor/msgVpns/%s/queues/%s", messageVpn, 
queueName);
+  }
+
+  private static String createQueueEndpoint(String messageVpn) {
+    return String.format("/config/msgVpns/%s/queues", messageVpn);
+  }
+
+  private static String subscriptionEndpoint(String messageVpn, String 
queueName) {
+    return String.format("/config/msgVpns/%s/queues/%s/subscriptions", 
messageVpn, queueName);
+  }
+
+  BrokerResponse getQueueResponse(String queueName) throws IOException {
+    String queryUrl = getQueueEndpoint(messageVpn, queueName);
+    HttpResponse response = executeGet(new GenericUrl(baseUrl + queryUrl));
+    return BrokerResponse.fromHttpResponse(response);
+  }
+
+  BrokerResponse createQueueResponse(String queueName) throws IOException {
+    String queryUrl = createQueueEndpoint(messageVpn);
+    ImmutableMap<String, Object> params =
+        ImmutableMap.<String, Object>builder()
+            .put("accessType", "non-exclusive")
+            .put("queueName", queueName)
+            .put("owner", username)
+            .put("permission", "consume")
+            .put("ingressEnabled", true)
+            .put("egressEnabled", true)
+            .build();
+
+    HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), 
params);
+    return BrokerResponse.fromHttpResponse(response);
+  }
+
+  BrokerResponse createSubscriptionResponse(String queueName, String 
topicName) throws IOException {
+    String queryUrl = subscriptionEndpoint(messageVpn, queueName);
+
+    ImmutableMap<String, Object> params =
+        ImmutableMap.<String, Object>builder()
+            .put("subscriptionTopic", topicName)
+            .put("queueName", queueName)
+            .build();
+    HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), 
params);
+    return BrokerResponse.fromHttpResponse(response);
+  }
+
+  private HttpResponse executeGet(GenericUrl url) throws IOException {
+    HttpRequest request = requestFactory.buildGetRequest(url);
+    return execute(request);
+  }
+
+  private HttpResponse executePost(GenericUrl url, ImmutableMap<String, 
Object> parameters)
+      throws IOException {
+    HttpContent content = new 
JsonHttpContent(GsonFactory.getDefaultInstance(), parameters);
+    HttpRequest request = requestFactory.buildPostRequest(url, content);
+    return execute(request);
+  }
+
+  private HttpResponse execute(HttpRequest request) throws IOException {
+    request.setNumberOfRetries(REQUEST_NUM_RETRIES);
+    HttpHeaders httpHeaders = new HttpHeaders();
+    boolean authFromCookie =
+        !checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey))
+            .getCookieStore()
+            .getCookies()
+            .isEmpty();
+    if (authFromCookie) {
+      setCookiesFromCookieManager(httpHeaders);
+      request.setHeaders(httpHeaders);
+    } else {
+      httpHeaders.setBasicAuthentication(username, password);
+      request.setHeaders(httpHeaders);
+    }
+
+    HttpResponse response;
+    try {
+      response = request.execute();
+    } catch (HttpResponseException e) {
+      if (authFromCookie && e.getStatusCode() == 401) {
+        
checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().removeAll();
+        // execute again without cookies to refresh the token.
+        return execute(request);
+      } else { // we might need to handle other response codes here.
+        throw e;
+      }
+    }
+
+    storeCookiesInCookieManager(response.getHeaders());
+    return response;
+  }
+
+  private void setCookiesFromCookieManager(HttpHeaders httpHeaders) {
+    httpHeaders.setCookie(
+        
checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey)).getCookieStore().getCookies()
+            .stream()
+            .map(s -> s.getName() + "=" + s.getValue())
+            .collect(Collectors.joining(";")));
+  }
+
+  private void storeCookiesInCookieManager(HttpHeaders headers) {
+    List<String> cookiesHeader = headers.getHeaderStringValues(COOKIES_HEADER);
+    if (cookiesHeader != null) {
+      for (String cookie : cookiesHeader) {
+        checkStateNotNull(COOKIE_MANAGER_MAP.get(cookieManagerKey))
+            .getCookieStore()
+            .add(null, HttpCookie.parse(cookie).get(0));
+      }
+    }
+  }
+
+  private static class CookieManagerKey implements Serializable {
+    private final String baseUrl;
+    private final String username;
+
+    CookieManagerKey(String baseUrl, String username) {
+      this.baseUrl = baseUrl;
+      this.username = username;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof CookieManagerKey)) {
+        return false;
+      }
+      CookieManagerKey that = (CookieManagerKey) o;
+      return Objects.equals(baseUrl, that.baseUrl) && Objects.equals(username, 
that.username);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(baseUrl, username);
+    }
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java
new file mode 100644
index 00000000000..f6f0fb51d22
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.data;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.auto.value.AutoValue;
+
+public class Semp {
+
+  @AutoValue
+  @JsonSerialize(as = Queue.class)
+  @JsonDeserialize(builder = AutoValue_Semp_Queue.Builder.class)
+  public abstract static class Queue {
+
+    public abstract QueueData data();
+
+    public static Builder builder() {
+      return new AutoValue_Semp_Queue.Builder();
+    }
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    @JsonPOJOBuilder(withPrefix = "set")
+    abstract static class Builder {
+
+      public abstract Builder setData(QueueData queueData);
+
+      public abstract Queue build();
+    }
+  }
+
+  @AutoValue
+  @JsonDeserialize(builder = AutoValue_Semp_QueueData.Builder.class)
+  public abstract static class QueueData {
+    public abstract String accessType();
+
+    public abstract long msgSpoolUsage();
+
+    public static Builder builder() {
+      return new AutoValue_Semp_QueueData.Builder();
+    }
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    @JsonPOJOBuilder(withPrefix = "set")
+    abstract static class Builder {
+
+      public abstract Builder setAccessType(String accessType);
+
+      public abstract Builder setMsgSpoolUsage(long msgSpoolUsage);
+
+      public abstract QueueData build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java
new file mode 100644
index 00000000000..8cc48ed17ef
--- /dev/null
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import java.io.IOException;
+import java.util.List;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class SempBasicAuthClientExecutorTest {
+
+  @Test
+  public void testExecuteStatus4xx() {
+    MockHttpTransport transport =
+        new MockHttpTransport() {
+          @Override
+          public LowLevelHttpRequest buildRequest(String method, String url) {
+            return new MockLowLevelHttpRequest() {
+              @Override
+              public LowLevelHttpResponse execute() {
+                MockLowLevelHttpResponse response = new 
MockLowLevelHttpResponse();
+                response.setStatusCode(404);
+                response.setContentType(Json.MEDIA_TYPE);
+                response.setContent(
+                    "{\"meta\":{\"error\":{\"code\":404,\"description\":\"some"
+                        + " error\",\"status\":\"xx\"}}}");
+                return response;
+              }
+            };
+          }
+        };
+
+    HttpRequestFactory requestFactory = transport.createRequestFactory();
+    SempBasicAuthClientExecutor client =
+        new SempBasicAuthClientExecutor(
+            "http://host";, "username", "password", "vpnName", requestFactory);
+
+    assertThrows(HttpResponseException.class, () -> 
client.getQueueResponse("queue"));
+  }
+
+  @Test
+  public void testExecuteStatus3xx() {
+    MockHttpTransport transport =
+        new MockHttpTransport() {
+          @Override
+          public LowLevelHttpRequest buildRequest(String method, String url) {
+            return new MockLowLevelHttpRequest() {
+              @Override
+              public LowLevelHttpResponse execute() {
+                MockLowLevelHttpResponse response = new 
MockLowLevelHttpResponse();
+                response.setStatusCode(301);
+                response.setContentType(Json.MEDIA_TYPE);
+                response.setContent(
+                    "{\"meta\":{\"error\":{\"code\":301,\"description\":\"some"
+                        + " error\",\"status\":\"xx\"}}}");
+                return response;
+              }
+            };
+          }
+        };
+
+    HttpRequestFactory requestFactory = transport.createRequestFactory();
+    SempBasicAuthClientExecutor client =
+        new SempBasicAuthClientExecutor(
+            "http://host";, "username", "password", "vpnName", requestFactory);
+
+    assertThrows(HttpResponseException.class, () -> 
client.getQueueResponse("queue"));
+  }
+
+  /**
+   * In this test case, we test a situation when a session that we used to 
authenticate to Semp
+   * expires.
+   *
+   * <p>To test this scenario, we need to do the following:
+   *
+   * <ol>
+   *   <li>Send the first request, to initialize a session. This request has 
to contain the Basic
+   *       Auth header and should not include any cookie headers. The response 
for this request
+   *       contains a session cookie we can re-use in the following requests.
+   *   <li>Send the second request - this request should use a cookie from the 
previous response.
+   *       There should be no Authorization header. To simulate an expired 
session scenario, we set
+   *       the response of this request to the "401 Unauthorized". This should 
cause a the request
+   *       to be retried, this time with the Authorization header.
+   *   <li>Validate the third request to contain the Basic Auth header and no 
session cookies.
+   * </ol>
+   */
+  @Test
+  public void testExecuteWithUnauthorized() throws IOException {
+    // Making it a final array, so that we can reference it from within the 
MockHttpTransport
+    // instance
+    final int[] requestCounter = {0};
+    MockHttpTransport transport =
+        new MockHttpTransport() {
+          @Override
+          public LowLevelHttpRequest buildRequest(String method, String url) {
+            return new MockLowLevelHttpRequest() {
+              @Override
+              public LowLevelHttpResponse execute() throws IOException {
+                MockLowLevelHttpResponse response = new 
MockLowLevelHttpResponse();
+                if (requestCounter[0] == 0) {
+                  // The first request has to include Basic Auth header
+                  assertTrue(this.getHeaders().containsKey("authorization"));
+                  List<String> authorizationHeaders = 
this.getHeaders().get("authorization");
+                  assertEquals(1, authorizationHeaders.size());
+                  assertTrue(authorizationHeaders.get(0).contains("Basic"));
+                  assertFalse(this.getHeaders().containsKey("cookie"));
+
+                  // Set the response to include Session cookies
+                  response
+                      .setHeaderNames(ImmutableList.of("Set-Cookie", 
"Set-Cookie"))
+                      .setHeaderValues(
+                          ImmutableList.of(
+                              
"ProxySession=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w;"
+                                  + " HttpOnly; SameSite=Strict;"
+                                  + " Path=/proxy; Max-Age=2592000",
+                              
"Session=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w;"
+                                  + " HttpOnly; SameSite=Strict;"
+                                  + " Path=/SEMP; Max-Age=2592000"));
+                  response.setStatusCode(200);
+                } else if (requestCounter[0] == 1) {
+                  // The second request does not include Basic Auth header
+                  assertFalse(this.getHeaders().containsKey("authorization"));
+                  // It must include a cookie header
+                  assertTrue(this.getHeaders().containsKey("cookie"));
+                  boolean hasSessionCookie =
+                      this.getHeaders().get("cookie").stream()
+                              .filter(
+                                  c ->
+                                      c.contains(
+                                          
"Session=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w"))
+                              .count()
+                          == 1;
+                  assertTrue(hasSessionCookie);
+
+                  // Let's assume the Session expired - we return the 401
+                  // unauthorized
+                  response.setStatusCode(401);
+                } else {
+                  // The second request has to be retried with a Basic Auth 
header
+                  // this time
+                  assertTrue(this.getHeaders().containsKey("authorization"));
+                  List<String> authorizationHeaders = 
this.getHeaders().get("authorization");
+                  assertEquals(1, authorizationHeaders.size());
+                  assertTrue(authorizationHeaders.get(0).contains("Basic"));
+                  assertFalse(this.getHeaders().containsKey("cookie"));
+
+                  response.setStatusCode(200);
+                }
+                response.setContentType(Json.MEDIA_TYPE);
+                requestCounter[0]++;
+                return response;
+              }
+            };
+          }
+        };
+
+    HttpRequestFactory requestFactory = transport.createRequestFactory();
+    SempBasicAuthClientExecutor client =
+        new SempBasicAuthClientExecutor(
+            "http://host";, "username", "password", "vpnName", requestFactory);
+
+    // The first, initial request
+    client.getQueueResponse("queue");
+    // The second request, which will try to authenticate with a cookie, and 
then with Basic
+    // Auth when it receives a 401 unauthorized
+    client.getQueueResponse("queue");
+
+    // There should be 3 requests executed:
+    // the first one is the initial one with Basic Auth,
+    // the second one uses the session cookie, but we simulate it being 
expired,
+    // so there should be a third request with Basic Auth to create a new 
session.
+    assertEquals(3, requestCounter[0]);
+  }
+}

Reply via email to