This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 35b0a051345176b2b141b4265a432f6817aef53f Author: Vova Kolmakov <[email protected]> AuthorDate: Thu Apr 11 21:16:14 2024 +0700 [HUDI-6441] Passing custom Headers with Hudi Callback URL (#10970) --- .../http/HoodieWriteCommitHttpCallbackClient.java | 46 ++++- .../config/HoodieWriteCommitCallbackConfig.java | 15 ++ .../client/http/TestCallbackHttpClient.java | 202 +++++++++++++++++++++ .../hudi/callback/http/TestCallbackHttpClient.java | 143 --------------- 4 files changed, 260 insertions(+), 146 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java index d9248ed20f1..037e84b3d00 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.callback.client.http; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.config.HoodieWriteCommitCallbackConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -34,6 +36,9 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.StringTokenizer; /** * Write commit callback http client. @@ -43,36 +48,42 @@ public class HoodieWriteCommitHttpCallbackClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteCommitHttpCallbackClient.class); public static final String HEADER_KEY_API_KEY = "HUDI-CALLBACK-KEY"; + static final String HEADERS_DELIMITER = ";"; + static final String HEADERS_KV_DELIMITER = ":"; private final String apiKey; private final String url; private final CloseableHttpClient client; private HoodieWriteConfig writeConfig; + private final Map<String, String> customHeaders; public HoodieWriteCommitHttpCallbackClient(HoodieWriteConfig config) { this.writeConfig = config; this.apiKey = getApiKey(); this.url = getUrl(); this.client = getClient(); + this.customHeaders = parseCustomHeaders(); } - public HoodieWriteCommitHttpCallbackClient(String apiKey, String url, CloseableHttpClient client) { + public HoodieWriteCommitHttpCallbackClient(String apiKey, String url, CloseableHttpClient client, Map<String, String> customHeaders) { this.apiKey = apiKey; this.url = url; this.client = client; + this.customHeaders = customHeaders != null ? customHeaders : new HashMap<>(); } public void send(String callbackMsg) { HttpPost request = new HttpPost(url); request.setHeader(HEADER_KEY_API_KEY, apiKey); request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()); + customHeaders.forEach(request::setHeader); request.setEntity(new StringEntity(callbackMsg, ContentType.APPLICATION_JSON)); try (CloseableHttpResponse response = client.execute(request)) { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode >= 300) { - LOG.warn(String.format("Failed to send callback message. Response was %s", response)); + LOG.warn("Failed to send callback message. Response was {}", response); } else { - LOG.info(String.format("Sent Callback data to %s successfully !", url)); + LOG.info("Sent Callback data with {} custom headers to {} successfully !", customHeaders.size(), url); } } catch (IOException e) { LOG.warn("Failed to send callback.", e); @@ -101,8 +112,37 @@ public class HoodieWriteCommitHttpCallbackClient implements Closeable { return writeConfig.getInt(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_IN_SECONDS); } + private Map<String, String> parseCustomHeaders() { + Map<String, String> headers = new HashMap<>(); + String headersString = writeConfig.getString(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_CUSTOM_HEADERS); + if (!StringUtils.isNullOrEmpty(headersString)) { + StringTokenizer tokenizer = new StringTokenizer(headersString, HEADERS_DELIMITER); + while (tokenizer.hasMoreTokens()) { + String token = tokenizer.nextToken(); + if (!StringUtils.isNullOrEmpty(token)) { + String[] keyValue = token.split(HEADERS_KV_DELIMITER); + if (keyValue.length == 2) { + String trimKey = keyValue[0].trim(); + String trimValue = keyValue[1].trim(); + if (trimKey.length() > 0 && trimValue.length() > 0) { + headers.put(trimKey, trimValue); + } + } else { + LOG.warn("Unable to parse some custom headers. Supported format is: Header_name1:Header value1;Header_name2:Header value2"); + } + } + } + } + return headers; + } + @Override public void close() throws IOException { client.close(); } + + @VisibleForTesting + String getCustomHeaders() { + return customHeaders.toString(); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java index 4ca52e48318..26f8aeb53ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.util.StringUtils; import java.io.File; import java.io.FileReader; @@ -76,6 +77,13 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig { .sinceVersion("0.6.0") .withDocumentation("Callback timeout in seconds."); + public static final ConfigProperty<String> CALLBACK_HTTP_CUSTOM_HEADERS = ConfigProperty + .key(CALLBACK_PREFIX + "http.custom.headers") + .noDefaultValue() + .markAdvanced() + .sinceVersion("0.15.0") + .withDocumentation("Http callback custom headers. Format: HeaderName1:HeaderValue1;HeaderName2:HeaderValue2"); + /** * @deprecated Use {@link #TURN_CALLBACK_ON} and its methods instead */ @@ -171,6 +179,13 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig { return this; } + public Builder withCustomHeaders(String customHeaders) { + if (!StringUtils.isNullOrEmpty(customHeaders)) { + writeCommitCallbackConfig.setValue(CALLBACK_HTTP_CUSTOM_HEADERS, customHeaders); + } + return this; + } + public HoodieWriteCommitCallbackConfig build() { writeCommitCallbackConfig.setDefaults(HoodieWriteCommitCallbackConfig.class.getName()); return writeCommitCallbackConfig; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/client/http/TestCallbackHttpClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/client/http/TestCallbackHttpClient.java new file mode 100644 index 00000000000..2de4ed08524 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/client/http/TestCallbackHttpClient.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.client.http; + +import org.apache.hudi.config.HoodieWriteCommitCallbackConfig; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit test for {@link HoodieWriteCommitHttpCallbackClient}. + */ +@ExtendWith(MockitoExtension.class) +class TestCallbackHttpClient { + + public static final String FAKE_API_KEY = "fake_api_key"; + public static final String FAKE_URL = "fake_url"; + public static final String CALLBACK_MSG = "{}"; + public static final String RESPONSE_UNAUTHORIZED = "unauthorized"; + @Mock + Appender appender; + + @Captor + ArgumentCaptor<LogEvent> logCaptor; + + @Mock + CloseableHttpClient httpClient; + + @Mock + CloseableHttpResponse httpResponse; + + @Mock + StatusLine statusLine; + + private Level initialLogLevel; + + @BeforeEach + void prepareAppender() { + when(appender.getName()).thenReturn("MockAppender-" + UUID.randomUUID()); + when(appender.isStarted()).thenReturn(true); + when(appender.isStopped()).thenReturn(false); + Logger logger = (Logger) LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class); + initialLogLevel = logger.getLevel(); + logger.setLevel(Level.DEBUG); + logger.addAppender(appender); + } + + @AfterEach + void resetMocks() { + Logger logger = (Logger) LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class); + logger.setLevel(initialLogLevel); + logger.removeAppender(appender); + reset(appender, httpClient, httpResponse, statusLine); + } + + private void mockResponse(int statusCode) { + when(statusLine.getStatusCode()).thenReturn(statusCode); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + try { + when(httpClient.execute(any())).thenReturn(httpResponse); + } catch (IOException e) { + fail(e.getMessage(), e); + } + } + + @Test + void sendPayloadShouldLogWhenRequestFailed() throws IOException { + when(httpClient.execute(any())).thenThrow(IOException.class); + + HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = + new HoodieWriteCommitHttpCallbackClient(FAKE_API_KEY, FAKE_URL, httpClient, null); + hoodieWriteCommitCallBackHttpClient.send(CALLBACK_MSG); + + verify(appender).append(logCaptor.capture()); + assertEquals("Failed to send callback.", logCaptor.getValue().getMessage().getFormattedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + void sendPayloadShouldLogUnsuccessfulSending() { + mockResponse(401); + when(httpResponse.toString()).thenReturn(RESPONSE_UNAUTHORIZED); + + HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = + new HoodieWriteCommitHttpCallbackClient(FAKE_API_KEY, FAKE_URL, httpClient, null); + hoodieWriteCommitCallBackHttpClient.send(CALLBACK_MSG); + + verify(appender).append(logCaptor.capture()); + assertEquals("Failed to send callback message. Response was " + RESPONSE_UNAUTHORIZED, logCaptor.getValue().getMessage().getFormattedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + void sendPayloadShouldLogSuccessfulSending() { + mockResponse(202); + + Map<String, String> customHeaders = new HashMap<>(); + customHeaders.put("key1", "val1"); + customHeaders.put("key2", "val2"); + HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = + new HoodieWriteCommitHttpCallbackClient(FAKE_API_KEY, FAKE_URL, httpClient, customHeaders); + hoodieWriteCommitCallBackHttpClient.send(CALLBACK_MSG); + + verify(appender).append(logCaptor.capture()); + assertTrue(logCaptor.getValue().getMessage().getFormattedMessage().startsWith("Sent Callback data with 2 custom headers")); + assertEquals(Level.INFO, logCaptor.getValue().getLevel()); + } + + @Test + void testParsingCustomHeaders() { + String customHeaders = "Authorization " + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "Basic 12345678"; + HoodieWriteCommitHttpCallbackClient client = makeClient(customHeaders); + assertEquals("{Authorization=Basic 12345678}", client.getCustomHeaders()); + customHeaders = "Authorization " + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "Basic 12345678" + HoodieWriteCommitHttpCallbackClient.HEADERS_DELIMITER + + " another_header_key " + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + " another_header_value "; + client = makeClient(customHeaders); + assertEquals("{Authorization=Basic 12345678, another_header_key=another_header_value}", client.getCustomHeaders()); + customHeaders = "Authorization" + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "Basic 12345678" + HoodieWriteCommitHttpCallbackClient.HEADERS_DELIMITER; + client = makeClient(customHeaders); + assertEquals("{Authorization=Basic 12345678}", client.getCustomHeaders()); + customHeaders = "Authorization" + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "Basic 12345678" + HoodieWriteCommitHttpCallbackClient.HEADERS_DELIMITER + "uu"; + client = makeClient(customHeaders); + assertEquals("{Authorization=Basic 12345678}", client.getCustomHeaders()); + customHeaders = "Authorization" + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + customHeaders = HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "Authorization"; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + customHeaders = "Authorization" + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "Basic 12345678" + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + + "Second header" + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + "val"; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + customHeaders = null; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + customHeaders = ""; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + customHeaders = " "; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + customHeaders = " " + HoodieWriteCommitHttpCallbackClient.HEADERS_KV_DELIMITER + " "; + client = makeClient(customHeaders); + assertEquals("{}", client.getCustomHeaders()); + } + + private HoodieWriteCommitHttpCallbackClient makeClient(String customHeaders) { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath("path") + .withCallbackConfig(HoodieWriteCommitCallbackConfig.newBuilder() + .withCallbackHttpApiKey(FAKE_API_KEY) + .withCallbackHttpUrl(FAKE_URL) + .withCustomHeaders(customHeaders) + .build()) + .build(); + return new HoodieWriteCommitHttpCallbackClient(config); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java deleted file mode 100644 index 49b948dd8c0..00000000000 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.callback.http; - -import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient; - -import org.apache.http.StatusLine; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.Logger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.io.IOException; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Unit test for {@link HoodieWriteCommitHttpCallbackClient}. - */ -@ExtendWith(MockitoExtension.class) -public class TestCallbackHttpClient { - - @Mock - Appender appender; - - @Captor - ArgumentCaptor<LogEvent> logCaptor; - - @Mock - CloseableHttpClient httpClient; - - @Mock - CloseableHttpResponse httpResponse; - - @Mock - StatusLine statusLine; - - private Level initialLogLevel; - - @BeforeEach - void prepareAppender() { - when(appender.getName()).thenReturn("MockAppender-" + UUID.randomUUID()); - when(appender.isStarted()).thenReturn(true); - when(appender.isStopped()).thenReturn(false); - Logger logger = (Logger) LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class); - initialLogLevel = logger.getLevel(); - logger.setLevel(Level.DEBUG); - logger.addAppender(appender); - } - - @AfterEach - void resetMocks() { - Logger logger = (Logger) LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class); - logger.setLevel(initialLogLevel); - logger.removeAppender(appender); - reset(appender, httpClient, httpResponse, statusLine); - } - - private void mockResponse(int statusCode) { - when(statusLine.getStatusCode()).thenReturn(statusCode); - when(httpResponse.getStatusLine()).thenReturn(statusLine); - try { - when(httpClient.execute(any())).thenReturn(httpResponse); - } catch (IOException e) { - fail(e.getMessage(), e); - } - } - - @Test - public void sendPayloadShouldLogWhenRequestFailed() throws IOException { - when(httpClient.execute(any())).thenThrow(IOException.class); - - HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = - new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient); - hoodieWriteCommitCallBackHttpClient.send("{}"); - - verify(appender).append(logCaptor.capture()); - assertEquals("Failed to send callback.", logCaptor.getValue().getMessage().getFormattedMessage()); - assertEquals(Level.WARN, logCaptor.getValue().getLevel()); - } - - @Test - public void sendPayloadShouldLogUnsuccessfulSending() { - mockResponse(401); - when(httpResponse.toString()).thenReturn("unauthorized"); - - HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = - new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient); - hoodieWriteCommitCallBackHttpClient.send("{}"); - - verify(appender).append(logCaptor.capture()); - assertEquals("Failed to send callback message. Response was unauthorized", logCaptor.getValue().getMessage().getFormattedMessage()); - assertEquals(Level.WARN, logCaptor.getValue().getLevel()); - } - - @Test - public void sendPayloadShouldLogSuccessfulSending() { - mockResponse(202); - - HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = - new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient); - hoodieWriteCommitCallBackHttpClient.send("{}"); - - verify(appender).append(logCaptor.capture()); - assertTrue(logCaptor.getValue().getMessage().getFormattedMessage().startsWith("Sent Callback data")); - assertEquals(Level.INFO, logCaptor.getValue().getLevel()); - } - -}
