This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f49b5d34342 [HUDI-4991] Allow kafka-like configs to set truststore and
keystore for the SchemaProvider
f49b5d34342 is described below
commit f49b5d3434285f6670f1d1c8d6204d5e975bd73f
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Jan 25 02:55:47 2023 -0500
[HUDI-4991] Allow kafka-like configs to set truststore and keystore for the
SchemaProvider
Update the SchemaRegistryProvider to take in kafka-like configs for
truststore and keystore.
Co-authored-by: Jonathan Vexler <=>
---
.../utilities/schema/SchemaRegistryProvider.java | 64 ++++++++++++++++++++--
1 file changed, 59 insertions(+), 5 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 8303fc2260b..9ade80ecce4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -26,13 +26,25 @@ import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
import org.apache.spark.api.java.JavaSparkContext;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
import java.util.Base64;
import java.util.Collections;
import java.util.regex.Matcher;
@@ -54,6 +66,11 @@ public class SchemaRegistryProvider extends SchemaProvider {
public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
public static final String SCHEMA_CONVERTER_PROP =
"hoodie.deltastreamer.schemaprovider.registry.schemaconverter";
+ public static final String SSL_KEYSTORE_LOCATION_PROP =
"schema.registry.ssl.keystore.location";
+ public static final String SSL_TRUSTSTORE_LOCATION_PROP =
"schema.registry.ssl.truststore.location";
+ public static final String SSL_KEYSTORE_PASSWORD_PROP =
"schema.registry.ssl.keystore.password";
+ public static final String SSL_TRUSTSTORE_PASSWORD_PROP =
"schema.registry.ssl.truststore.password";
+ public static final String SSL_KEY_PASSWORD_PROP =
"schema.registry.ssl.key.password";
}
@FunctionalInterface
@@ -86,24 +103,34 @@ public class SchemaRegistryProvider extends SchemaProvider
{
* @throws IOException
*/
public String fetchSchemaFromRegistry(String registryUrl) throws IOException
{
- URL registry;
HttpURLConnection connection;
Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
if (matcher.find()) {
String creds = matcher.group(1);
String urlWithoutCreds = registryUrl.replace(creds + "@", "");
- registry = new URL(urlWithoutCreds);
- connection = (HttpURLConnection) registry.openConnection();
+ connection = getConnection(urlWithoutCreds);
setAuthorizationHeader(matcher.group(1), connection);
} else {
- registry = new URL(registryUrl);
- connection = (HttpURLConnection) registry.openConnection();
+ connection = getConnection(registryUrl);
}
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(getStream(connection));
return node.get("schema").asText();
}
+ private SSLSocketFactory sslSocketFactory;
+
+ protected HttpURLConnection getConnection(String url) throws IOException {
+ URL registry = new URL(url);
+ if (sslSocketFactory != null) {
+ // we cannot cast to HttpsURLConnection if url is http so only cast when
sslSocketFactory is set
+ HttpsURLConnection connection = (HttpsURLConnection)
registry.openConnection();
+ connection.setSSLSocketFactory(sslSocketFactory);
+ return connection;
+ }
+ return (HttpURLConnection) registry.openConnection();
+ }
+
protected void setAuthorizationHeader(String creds, HttpURLConnection
connection) {
String encodedAuth =
Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
@@ -116,6 +143,33 @@ public class SchemaRegistryProvider extends SchemaProvider
{
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
+ || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+ setUpSSLStores();
+ }
+ }
+
+ private void setUpSSLStores() {
+ SSLContextBuilder sslContextBuilder = SSLContexts.custom();
+ try {
+ if (config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+ sslContextBuilder.loadTrustMaterial(
+ new File(config.getString(Config.SSL_TRUSTSTORE_LOCATION_PROP)),
+
config.getString(Config.SSL_TRUSTSTORE_PASSWORD_PROP).toCharArray(),
+ new TrustSelfSignedStrategy());
+ }
+ if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)) {
+ sslContextBuilder.loadKeyMaterial(
+ new File(config.getString(Config.SSL_KEYSTORE_LOCATION_PROP)),
+ config.getString(Config.SSL_KEYSTORE_PASSWORD_PROP).toCharArray(),
+ config.getString(Config.SSL_KEY_PASSWORD_PROP).toCharArray()
+ );
+ }
+ sslSocketFactory = sslContextBuilder.build().getSocketFactory();
+ } catch (UnrecoverableKeyException | IOException | KeyStoreException |
NoSuchAlgorithmException | CertificateException | KeyManagementException e) {
+ throw new RuntimeException(e);
+ }
+
}
@Override