This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 593fcb87b6d [improve][io] Elasticsearch sink: Support loading config
from secrets (#18986)
593fcb87b6d is described below
commit 593fcb87b6d3bd6401eb4ffa4c25d8b3f724e49d
Author: Alexander Preuß <[email protected]>
AuthorDate: Thu Jan 5 14:40:10 2023 +0100
[improve][io] Elasticsearch sink: Support loading config from secrets
(#18986)
---
pulsar-io/elastic-search/pom.xml | 6 ++
.../io/elasticsearch/ElasticSearchConfig.java | 7 ++-
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 2 +-
.../io/elasticsearch/ElasticSearchConfigTests.java | 71 +++++++++++++++-------
4 files changed, 59 insertions(+), 27 deletions(-)
diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index f18bd6f47df..e55a12d2273 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -57,6 +57,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index c70be56415f..3e268499b63 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -28,6 +28,8 @@ import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -338,9 +340,8 @@ public class ElasticSearchConfig implements Serializable {
return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
}
- public static ElasticSearchConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(new ObjectMapper().writeValueAsString(map),
ElasticSearchConfig.class);
+ public static ElasticSearchConfig load(Map<String, Object> map,
SinkContext sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, ElasticSearchConfig.class,
sinkContext);
}
public void validate() {
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 05ca909f0c6..e2566d20638 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -73,7 +73,7 @@ public class ElasticSearchSink implements Sink<GenericObject>
{
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- elasticSearchConfig = ElasticSearchConfig.load(config);
+ elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
elasticSearchConfig.validate();
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
index 28f3c444e88..7d85c027c48 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
@@ -18,11 +18,12 @@
*/
package org.apache.pulsar.io.elasticsearch;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -34,6 +35,8 @@ import static org.testng.Assert.expectThrows;
public class ElasticSearchConfigTests {
+ private final SinkContext mockContext = Mockito.mock(SinkContext.class);
+
@Test
public final void loadFromYamlFileTest() throws IOException {
File yamlFile = getFile("sinkConfig.yaml");
@@ -57,7 +60,7 @@ public class ElasticSearchConfigTests {
map.put("password", "go-speedie-go");
map.put("primaryFields", "x");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
assertNotNull(config);
assertEquals(config.getElasticSearchUrl(), "http://localhost:90902");
assertEquals(config.getIndexName(), "myIndex");
@@ -69,8 +72,8 @@ public class ElasticSearchConfigTests {
@Test
public final void defaultValueTest() throws IOException {
- ElasticSearchConfig config =
ElasticSearchConfig.load(Collections.emptyMap());
- assertNull(config.getElasticSearchUrl());
+ Map<String, Object> requiredConfig = Map.of("elasticSearchUrl",
"http://localhost:90902");
+ ElasticSearchConfig config = ElasticSearchConfig.load(requiredConfig,
mockContext);
assertNull(config.getIndexName());
assertEquals(config.getTypeName(), "_doc");
assertNull(config.getUsername());
@@ -121,7 +124,7 @@ public class ElasticSearchConfigTests {
map.put("username", "racerX");
map.put("password", "go-speedie-go");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
assertNotNull(config);
config.validate();
}
@@ -135,17 +138,17 @@ public class ElasticSearchConfigTests {
map.put("password", "go-speedie-go");
map.put("indexNumberOfReplicas", "0");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "elasticSearchUrl not set.")
+ expectedExceptionsMessageRegExp = "elasticSearchUrl cannot be
null")
public final void missingRequiredPropertiesTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("indexName", "toto");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -159,7 +162,7 @@ public class ElasticSearchConfigTests {
map.put("password", "go-speedie-go");
map.put("indexNumberOfShards", "0");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -173,7 +176,7 @@ public class ElasticSearchConfigTests {
map.put("password", "go-speedie-go");
map.put("indexNumberOfReplicas", "-1");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -185,7 +188,7 @@ public class ElasticSearchConfigTests {
map.put("indexName", "myindex");
map.put("username", "racerX");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -197,7 +200,7 @@ public class ElasticSearchConfigTests {
map.put("indexName", "myindex");
map.put("password", "go-speedie-go");
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -210,35 +213,35 @@ public class ElasticSearchConfigTests {
map.put("password", "go-speedie-go");
map.put("token", "tok");
{
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
expectThrows(IllegalArgumentException.class, () ->
config.validate());
}
map.put("apiKey", "apiKey");
{
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
expectThrows(IllegalArgumentException.class, () ->
config.validate());
}
map.remove("token");
{
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
expectThrows(IllegalArgumentException.class, () ->
config.validate());
}
map.remove("username");
map.remove("password");
{
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
map.put("token", "tok");
map.remove("apiKey");
{
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
map.remove("token");
{
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
}
@@ -249,7 +252,7 @@ public class ElasticSearchConfigTests {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:90902");
map.put("connectTimeoutInMs", -1);
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -259,7 +262,7 @@ public class ElasticSearchConfigTests {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:90902");
map.put("connectionRequestTimeoutInMs", -1);
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -269,7 +272,7 @@ public class ElasticSearchConfigTests {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:90902");
map.put("socketTimeoutInMs", -1);
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -279,7 +282,7 @@ public class ElasticSearchConfigTests {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:90902");
map.put("bulkConcurrentRequests", -1);
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
}
@@ -305,7 +308,7 @@ public class ElasticSearchConfigTests {
sslMap.put("provider", "Sun");
map.put("ssl", sslMap);
- ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
mockContext);
config.validate();
assertEquals(config.getSsl().isEnabled(), true);
assertEquals(config.getSsl().getTruststorePath(),
"/ssl/truststore.jks");
@@ -316,4 +319,26 @@ public class ElasticSearchConfigTests {
assertEquals(config.getSsl().getProtocols(), "TLSv1.2,TLSv1.3");
assertEquals(config.getSsl().getProvider(), "Sun");
}
+
+ @Test
+ public final void loadConfigFromSecretsTest() throws IOException {
+ SinkContext contextWithSecrets = Mockito.mock(SinkContext.class);
+
Mockito.when(contextWithSecrets.getSecret("username")).thenReturn("secretUser");
+
Mockito.when(contextWithSecrets.getSecret("password")).thenReturn("$ecret123");
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("elasticSearchUrl", "http://localhost:90902");
+ map.put("indexName", "myIndex");
+ map.put("typeName", "doc");
+ map.put("primaryFields", "x");
+
+ ElasticSearchConfig config = ElasticSearchConfig.load(map,
contextWithSecrets);
+ assertNotNull(config);
+ assertEquals(config.getElasticSearchUrl(), "http://localhost:90902");
+ assertEquals(config.getIndexName(), "myIndex");
+ assertEquals(config.getTypeName(), "doc");
+ assertEquals(config.getPrimaryFields(), "x");
+ assertEquals(config.getUsername(), "secretUser");
+ assertEquals(config.getPassword(), "$ecret123");
+ }
}
\ No newline at end of file