This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eacd108 Fix connectors nested configs (#4067)
eacd108 is described below
commit eacd1082421c8e463b05f7f0b95e092b0fb5310b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Apr 17 17:15:27 2019 -0700
Fix connectors nested configs (#4067)
* fix update connectors with nested configs
* clean up logging
---
.../apache/pulsar/functions/utils/SinkConfigUtils.java | 14 ++++++++++++--
.../pulsar/functions/utils/SourceConfigUtils.java | 17 +++++++++++++++--
.../pulsar/functions/utils/SinkConfigUtilsTest.java | 10 +++++++++-
.../pulsar/functions/utils/SourceConfigUtilsTest.java | 10 +++++++++-
4 files changed, 45 insertions(+), 6 deletions(-)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index bbbe533..d0e71c8 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.utils;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -239,8 +241,16 @@ public class SinkConfigUtils {
sinkConfig.setArchive("builtin://" +
functionDetails.getSink().getBuiltin());
}
if
(!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs()))
{
- Type type = new TypeToken<Map<String, String>>() {}.getType();
- sinkConfig.setConfigs(new
Gson().fromJson(functionDetails.getSink().getConfigs(), type));
+ TypeReference<HashMap<String,Object>> typeRef
+ = new TypeReference<HashMap<String,Object>>() {};
+ Map<String, Object> configMap;
+ try {
+ configMap =
ObjectMapperFactory.getThreadLocal().readValue(functionDetails.getSink().getConfigs(),
typeRef);
+ } catch (IOException e) {
+ log.error("Failed to read configs for sink {}",
FunctionCommon.getFullyQualifiedName(functionDetails), e);
+ throw new RuntimeException(e);
+ }
+ sinkConfig.setConfigs(configMap);
}
if (!isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index b4a812d..f223583 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -19,16 +19,19 @@
package org.apache.pulsar.functions.utils;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -38,12 +41,14 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Path;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static
org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
+@Slf4j
public class SourceConfigUtils {
@Getter
@@ -155,8 +160,16 @@ public class SourceConfigUtils {
sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin());
}
if (!StringUtils.isEmpty(sourceSpec.getConfigs())) {
- Type type = new TypeToken<Map<String, String>>() {}.getType();
- sourceConfig.setConfigs(new
Gson().fromJson(sourceSpec.getConfigs(), type));
+ TypeReference<HashMap<String,Object>> typeRef
+ = new TypeReference<HashMap<String,Object>>() {};
+ Map<String, Object> configMap;
+ try {
+ configMap =
ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(),
typeRef);
+ } catch (IOException e) {
+ log.error("Failed to read configs for source {}",
FunctionCommon.getFullyQualifiedName(functionDetails), e);
+ throw new RuntimeException(e);
+ }
+ sourceConfig.setConfigs(configMap);
}
if (!isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 1bde64d..d72698d 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -53,7 +53,15 @@ public class SinkConfigUtilsTest {
inputSpecs.put("test-input",
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
sinkConfig.setInputSpecs(inputSpecs);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- sinkConfig.setConfigs(new HashMap<>());
+
+ Map<String, String> producerConfigs = new HashMap<>();
+ producerConfigs.put("security.protocal", "SASL_PLAINTEXT");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put("topic", "kafka");
+ configs.put("bootstrapServers", "server-1,server-2");
+ configs.put("producerConfigProperties", producerConfigs);
+
+ sinkConfig.setConfigs(configs);
sinkConfig.setRetainOrdering(false);
sinkConfig.setAutoAck(true);
sinkConfig.setTimeoutMs(2000l);
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index f8c138b..ccdbcdc 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -51,7 +51,15 @@ public class SourceConfigUtilsTest {
sourceConfig.setParallelism(1);
sourceConfig.setRuntimeFlags("-DKerberos");
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- sourceConfig.setConfigs(new HashMap<>());
+
+ Map<String, String> consumerConfigs = new HashMap<>();
+ consumerConfigs.put("security.protocal", "SASL_PLAINTEXT");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put("topic", "kafka");
+ configs.put("bootstrapServers", "server-1,server-2");
+ configs.put("consumerConfigProperties", consumerConfigs);
+
+ sourceConfig.setConfigs(configs);
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
SourceConfig convertedConfig =
SourceConfigUtils.convertFromDetails(functionDetails);