bzablocki commented on code in PR #31542: URL: https://github.com/apache/beam/pull/31542#discussion_r1668514727
########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpContent; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.json.JsonHttpContent; +import com.google.api.client.json.gson.GsonFactory; +import java.io.IOException; +import java.io.Serializable; +import java.net.CookieManager; +import java.net.HttpCookie; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * A class to execute requests to SEMP v2 with Basic Auth authentication. + * + * <p>This approach takes advantage of <a + * href="https://docs.solace.com/Admin/SEMP/SEMP-Security.htm#Sessions">SEMP Sessions</a>. The + * session is established when a user authenticates with HTTP Basic authentication. When the + * response is 401 Unauthorized, the client will execute an additional request with Basic Auth + * header to refresh the token. + */ +class SempBasicAuthClientExecutor implements Serializable { + private static final CookieManager COOKIE_MANAGER = new CookieManager(); + private static final String COOKIES_HEADER = "Set-Cookie"; + + private final String username; + private final String messageVpn; + private final String baseUrl; + private final String password; + private final transient HttpRequestFactory requestFactory; + + SempBasicAuthClientExecutor( + String host, + String username, + String password, + String vpnName, + HttpRequestFactory httpRequestFactory) { + this.baseUrl = String.format("%s/SEMP/v2", host); + this.username = username; + this.messageVpn = vpnName; + this.password = password; + this.requestFactory = httpRequestFactory; + } + + private static String getQueueEndpoint(String messageVpn, String queueName) { + return String.format("/monitor/msgVpns/%s/queues/%s", messageVpn, queueName); + } + + private static String createQueueEndpoint(String messageVpn) { + return String.format("/config/msgVpns/%s/queues", messageVpn); + } + + private static String subscriptionEndpoint(String messageVpn, String queueName) { + return String.format("/config/msgVpns/%s/queues/%s/subscriptions", messageVpn, queueName); + } + + BrokerResponse getQueueResponse(String queueName) throws IOException { + String queryUrl = getQueueEndpoint(messageVpn, queueName); + HttpResponse response = executeGet(new GenericUrl(baseUrl + queryUrl)); + return BrokerResponse.fromHttpResponse(response); + } + + BrokerResponse createQueueResponse(String queueName) throws IOException { + String queryUrl = createQueueEndpoint(messageVpn); + ImmutableMap<String, Object> params = + ImmutableMap.<String, Object>builder() + .put("accessType", "non-exclusive") + .put("queueName", queueName) + .put("owner", username) + .put("permission", "consume") + .put("ingressEnabled", true) + .put("egressEnabled", true) + .build(); + + HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), params); + return BrokerResponse.fromHttpResponse(response); + } + + BrokerResponse createSubscriptionResponse(String queueName, String topicName) throws IOException { + String queryUrl = subscriptionEndpoint(messageVpn, queueName); + + ImmutableMap<String, Object> params = + ImmutableMap.<String, Object>builder() + .put("subscriptionTopic", topicName) + .put("queueName", queueName) + .build(); + HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), params); + return BrokerResponse.fromHttpResponse(response); + } + + private HttpResponse executeGet(GenericUrl url) throws IOException { + HttpRequest request = requestFactory.buildGetRequest(url); + return execute(request); + } + + private HttpResponse executePost(GenericUrl url, ImmutableMap<String, Object> parameters) + throws IOException { + HttpContent content = new JsonHttpContent(GsonFactory.getDefaultInstance(), parameters); + HttpRequest request = requestFactory.buildPostRequest(url, content); + return execute(request); + } + + private HttpResponse execute(HttpRequest request) throws IOException { + request.setNumberOfRetries(2); + HttpHeaders httpHeaders = new HttpHeaders(); + boolean authFromCookie = COOKIE_MANAGER.getCookieStore().getCookies().size() > 0; Review Comment: Good point, changed to a ConcurrentHashMap to make sure it's thread-safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
