This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 593fcb87b6d [improve][io] Elasticsearch sink: Support loading config 
from secrets (#18986)
593fcb87b6d is described below

commit 593fcb87b6d3bd6401eb4ffa4c25d8b3f724e49d
Author: Alexander Preuß <[email protected]>
AuthorDate: Thu Jan 5 14:40:10 2023 +0100

    [improve][io] Elasticsearch sink: Support loading config from secrets 
(#18986)
---
 pulsar-io/elastic-search/pom.xml                   |  6 ++
 .../io/elasticsearch/ElasticSearchConfig.java      |  7 ++-
 .../pulsar/io/elasticsearch/ElasticSearchSink.java |  2 +-
 .../io/elasticsearch/ElasticSearchConfigTests.java | 71 +++++++++++++++-------
 4 files changed, 59 insertions(+), 27 deletions(-)

diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index f18bd6f47df..e55a12d2273 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -57,6 +57,12 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index c70be56415f..3e268499b63 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
@@ -338,9 +340,8 @@ public class ElasticSearchConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
     }
 
-    public static ElasticSearchConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
ElasticSearchConfig.class);
+    public static ElasticSearchConfig load(Map<String, Object> map, 
SinkContext sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, ElasticSearchConfig.class, 
sinkContext);
     }
 
     public void validate() {
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 05ca909f0c6..e2566d20638 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -73,7 +73,7 @@ public class ElasticSearchSink implements Sink<GenericObject> 
{
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
         elasticSearchConfig.validate();
         elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
         if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
index 28f3c444e88..7d85c027c48 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
@@ -18,11 +18,12 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,6 +35,8 @@ import static org.testng.Assert.expectThrows;
 
 public class ElasticSearchConfigTests {
 
+    private final SinkContext mockContext = Mockito.mock(SinkContext.class);
+
     @Test
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("sinkConfig.yaml");
@@ -57,7 +60,7 @@ public class ElasticSearchConfigTests {
         map.put("password", "go-speedie-go");
         map.put("primaryFields", "x");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         assertNotNull(config);
         assertEquals(config.getElasticSearchUrl(), "http://localhost:90902";);
         assertEquals(config.getIndexName(), "myIndex");
@@ -69,8 +72,8 @@ public class ElasticSearchConfigTests {
 
     @Test
     public final void defaultValueTest() throws IOException {
-        ElasticSearchConfig config = 
ElasticSearchConfig.load(Collections.emptyMap());
-        assertNull(config.getElasticSearchUrl());
+        Map<String, Object> requiredConfig = Map.of("elasticSearchUrl", 
"http://localhost:90902";);
+        ElasticSearchConfig config = ElasticSearchConfig.load(requiredConfig, 
mockContext);
         assertNull(config.getIndexName());
         assertEquals(config.getTypeName(), "_doc");
         assertNull(config.getUsername());
@@ -121,7 +124,7 @@ public class ElasticSearchConfigTests {
         map.put("username", "racerX");
         map.put("password", "go-speedie-go");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         assertNotNull(config);
         config.validate();
     }
@@ -135,17 +138,17 @@ public class ElasticSearchConfigTests {
         map.put("password", "go-speedie-go");
         map.put("indexNumberOfReplicas", "0");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "elasticSearchUrl not set.")
+            expectedExceptionsMessageRegExp = "elasticSearchUrl cannot be 
null")
     public final void missingRequiredPropertiesTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("indexName", "toto");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -159,7 +162,7 @@ public class ElasticSearchConfigTests {
         map.put("password", "go-speedie-go");
         map.put("indexNumberOfShards", "0");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -173,7 +176,7 @@ public class ElasticSearchConfigTests {
         map.put("password", "go-speedie-go");
         map.put("indexNumberOfReplicas", "-1");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -185,7 +188,7 @@ public class ElasticSearchConfigTests {
         map.put("indexName", "myindex");
         map.put("username", "racerX");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -197,7 +200,7 @@ public class ElasticSearchConfigTests {
         map.put("indexName", "myindex");
         map.put("password", "go-speedie-go");
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -210,35 +213,35 @@ public class ElasticSearchConfigTests {
         map.put("password", "go-speedie-go");
         map.put("token", "tok");
         {
-            ElasticSearchConfig config = ElasticSearchConfig.load(map);
+            ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
             expectThrows(IllegalArgumentException.class, () -> 
config.validate());
         }
         map.put("apiKey", "apiKey");
         {
-            ElasticSearchConfig config = ElasticSearchConfig.load(map);
+            ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
             expectThrows(IllegalArgumentException.class, () -> 
config.validate());
         }
         map.remove("token");
         {
-            ElasticSearchConfig config = ElasticSearchConfig.load(map);
+            ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
             expectThrows(IllegalArgumentException.class, () -> 
config.validate());
         }
         map.remove("username");
         map.remove("password");
         {
-            ElasticSearchConfig config = ElasticSearchConfig.load(map);
+            ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
             config.validate();
         }
         map.put("token", "tok");
         map.remove("apiKey");
         {
-            ElasticSearchConfig config = ElasticSearchConfig.load(map);
+            ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
             config.validate();
         }
         map.remove("token");
 
         {
-            ElasticSearchConfig config = ElasticSearchConfig.load(map);
+            ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
             config.validate();
         }
     }
@@ -249,7 +252,7 @@ public class ElasticSearchConfigTests {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("elasticSearchUrl", "http://localhost:90902";);
         map.put("connectTimeoutInMs", -1);
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -259,7 +262,7 @@ public class ElasticSearchConfigTests {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("elasticSearchUrl", "http://localhost:90902";);
         map.put("connectionRequestTimeoutInMs", -1);
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -269,7 +272,7 @@ public class ElasticSearchConfigTests {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("elasticSearchUrl", "http://localhost:90902";);
         map.put("socketTimeoutInMs", -1);
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -279,7 +282,7 @@ public class ElasticSearchConfigTests {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("elasticSearchUrl", "http://localhost:90902";);
         map.put("bulkConcurrentRequests", -1);
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
     }
 
@@ -305,7 +308,7 @@ public class ElasticSearchConfigTests {
         sslMap.put("provider", "Sun");
         map.put("ssl", sslMap);
 
-        ElasticSearchConfig config = ElasticSearchConfig.load(map);
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
mockContext);
         config.validate();
         assertEquals(config.getSsl().isEnabled(), true);
         assertEquals(config.getSsl().getTruststorePath(), 
"/ssl/truststore.jks");
@@ -316,4 +319,26 @@ public class ElasticSearchConfigTests {
         assertEquals(config.getSsl().getProtocols(), "TLSv1.2,TLSv1.3");
         assertEquals(config.getSsl().getProvider(), "Sun");
     }
+
+    @Test
+    public final void loadConfigFromSecretsTest() throws IOException {
+        SinkContext contextWithSecrets = Mockito.mock(SinkContext.class);
+        
Mockito.when(contextWithSecrets.getSecret("username")).thenReturn("secretUser");
+        
Mockito.when(contextWithSecrets.getSecret("password")).thenReturn("$ecret123");
+
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("elasticSearchUrl", "http://localhost:90902";);
+        map.put("indexName", "myIndex");
+        map.put("typeName", "doc");
+        map.put("primaryFields", "x");
+
+        ElasticSearchConfig config = ElasticSearchConfig.load(map, 
contextWithSecrets);
+        assertNotNull(config);
+        assertEquals(config.getElasticSearchUrl(), "http://localhost:90902";);
+        assertEquals(config.getIndexName(), "myIndex");
+        assertEquals(config.getTypeName(), "doc");
+        assertEquals(config.getPrimaryFields(), "x");
+        assertEquals(config.getUsername(), "secretUser");
+        assertEquals(config.getPassword(), "$ecret123");
+    }
 }
\ No newline at end of file

Reply via email to