This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 76948035683 [fix][connector] IOConfigUtils support required and
defaultValue annotations (#16785)
76948035683 is described below
commit 769480356836d039eb63a43eafcec32a1843567d
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 2 10:29:39 2022 +0800
[fix][connector] IOConfigUtils support required and defaultValue
annotations (#16785)
### Motivation
Currently, `FieldDoc` annotation has `required` and `defaultValue` config.
But `IOConfigUtils` didn't deal with those.
The connector we implemented deserialized the configuration using
`IOConfigUtils.loadWithSecrets()`
### Modifications
- IOConfigUtils support required and defaultValue annotations.
---
.../org/apache/pulsar/io/common/IOConfigUtils.java | 17 +++-
.../apache/pulsar/io/common/IOConfigUtilsTest.java | 94 +++++++++++++++++++++-
.../pulsar/io/kinesis/KinesisSinkConfig.java | 2 +-
3 files changed, 107 insertions(+), 6 deletions(-)
diff --git
a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index b96585828b0..9bedcc37bab 100644
---
a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++
b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
@@ -62,8 +63,9 @@ public class IOConfigUtils {
field.setAccessible(true);
for (Annotation annotation : field.getAnnotations()) {
if (annotation.annotationType().equals(FieldDoc.class)) {
- if (((FieldDoc) annotation).sensitive()) {
- String secret = null;
+ FieldDoc fieldDoc = (FieldDoc) annotation;
+ if (fieldDoc.sensitive()) {
+ String secret;
try {
secret = secretsGetter.apply(field.getName());
} catch (Exception e) {
@@ -74,8 +76,17 @@ public class IOConfigUtils {
configs.put(field.getName(), secret);
}
}
+ configs.computeIfAbsent(field.getName(), key -> {
+ if (fieldDoc.required()) {
+ throw new IllegalArgumentException(field.getName()
+ " cannot be null");
+ }
+ String value = fieldDoc.defaultValue();
+ if (!StringUtils.isEmpty(value)) {
+ return value;
+ }
+ return null;
+ });
}
-
}
}
return new ObjectMapper().convertValue(configs, clazz);
diff --git
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 7317e9296c6..52126c538e0 100644
---
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -44,11 +44,79 @@ import java.util.concurrent.CompletableFuture;
public class IOConfigUtilsTest {
@Data
- static class TestConfig {
+ static class TestDefaultConfig {
+
@FieldDoc(
required = true,
defaultValue = "",
sensitive = true,
+ help = "testRequired"
+ )
+ protected String testRequired;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "defaultStr",
+ sensitive = false,
+ help = "defaultStr"
+ )
+ protected String defaultStr;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "true",
+ sensitive = false,
+ help = "defaultBool"
+ )
+ protected boolean defaultBool;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "100",
+ sensitive = false,
+ help = "defaultInt"
+ )
+ protected int defaultInt;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "100",
+ sensitive = false,
+ help = "defaultLong"
+ )
+ protected long defaultLong;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "100.12",
+ sensitive = false,
+ help = "defaultDouble"
+ )
+ protected double defaultDouble;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "100.10",
+ sensitive = false,
+ help = "defaultFloat"
+ )
+ protected float defaultFloat;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ sensitive = false,
+ help = "noDefault"
+ )
+ protected int noDefault;
+ }
+
+ @Data
+ static class TestConfig {
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ sensitive = true,
help = "password"
)
protected String password;
@@ -65,7 +133,7 @@ public class IOConfigUtilsTest {
* Non-string secrets are not supported at this moment
*/
@FieldDoc(
- required = true,
+ required = false,
defaultValue = "",
sensitive = true,
help = ""
@@ -218,6 +286,28 @@ public class IOConfigUtilsTest {
}
}
+ @Test
+ public void testDefaultValue() {
+
+ // test required field.
+ Assert.expectThrows(IllegalArgumentException.class,
+ () -> IOConfigUtils.loadWithSecrets(new HashMap<>(),
TestDefaultConfig.class, new TestSinkContext()));
+
+
+ // test all default value.
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("testRequired", "test");
+ TestDefaultConfig testDefaultConfig =
+ IOConfigUtils.loadWithSecrets(configMap,
TestDefaultConfig.class, new TestSinkContext());
+ Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr");
+ Assert.assertEquals(testDefaultConfig.isDefaultBool(), true);
+ Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100);
+ Assert.assertEquals(testDefaultConfig.getDefaultLong(), 100);
+ Assert.assertEquals(testDefaultConfig.getDefaultDouble(),
100.12,0.00001);
+ Assert.assertEquals(testDefaultConfig.getDefaultFloat(),
100.10,0.00001);
+ Assert.assertEquals(testDefaultConfig.getNoDefault(), 0);
+ }
+
@Test
public void testSourceLoadWithSecrets() {
Map<String, Object> configMap = new HashMap<>();
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index 4d74a4ca35c..b0742368f42 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -49,7 +49,7 @@ public class KinesisSinkConfig extends BaseKinesisConfig
implements Serializable
private Boolean skipCertificateValidation = false;
@FieldDoc(
- required = true,
+ required = false,
defaultValue = "ONLY_RAW_PAYLOAD",
help = "Message format in which kinesis sink converts pulsar messages
and publishes to kinesis streams.\n"
+ " #\n"