This is an automated email from the ASF dual-hosted git repository. jianghaiting pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2d55d55fc3de2f7f598f9a8c143fda0194cd4107 Author: Baodi Shi <[email protected]> AuthorDate: Tue Aug 2 10:29:39 2022 +0800 [fix][connector] IOConfigUtils support required and defaultValue annotations (#16785) Currently, `FieldDoc` annotation has `required` and `defaultValue` config. But `IOConfigUtils` didn't deal with those. The connector we implemented deserialized the configuration using `IOConfigUtils.loadWithSecrets()` - IOConfigUtils support required and defaultValue annotations. (cherry picked from commit 769480356836d039eb63a43eafcec32a1843567d) --- .../org/apache/pulsar/io/common/IOConfigUtils.java | 17 +++- .../apache/pulsar/io/common/IOConfigUtilsTest.java | 94 +++++++++++++++++++++- .../pulsar/io/kinesis/KinesisSinkConfig.java | 2 +- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java index b96585828b0..9bedcc37bab 100644 --- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java +++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; @@ -62,8 +63,9 @@ public class IOConfigUtils { field.setAccessible(true); for (Annotation annotation : field.getAnnotations()) { if (annotation.annotationType().equals(FieldDoc.class)) { - if (((FieldDoc) annotation).sensitive()) { - String secret = null; + FieldDoc fieldDoc = (FieldDoc) annotation; + if (fieldDoc.sensitive()) { + String secret; try { secret = secretsGetter.apply(field.getName()); } catch (Exception e) { @@ -74,8 +76,17 @@ public class IOConfigUtils { configs.put(field.getName(), secret); } } + configs.computeIfAbsent(field.getName(), key -> { + if (fieldDoc.required()) { + throw new IllegalArgumentException(field.getName() + " cannot be null"); + } + String value = fieldDoc.defaultValue(); + if (!StringUtils.isEmpty(value)) { + return value; + } + return null; + }); } - } } return new ObjectMapper().convertValue(configs, clazz); diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index e2ec4918bb7..88916291f24 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -42,11 +42,79 @@ import java.util.concurrent.CompletableFuture; public class IOConfigUtilsTest { @Data - static class TestConfig { + static class TestDefaultConfig { + @FieldDoc( required = true, defaultValue = "", sensitive = true, + help = "testRequired" + ) + protected String testRequired; + + @FieldDoc( + required = false, + defaultValue = "defaultStr", + sensitive = false, + help = "defaultStr" + ) + protected String defaultStr; + + @FieldDoc( + required = false, + defaultValue = "true", + sensitive = false, + help = "defaultBool" + ) + protected boolean defaultBool; + + @FieldDoc( + required = false, + defaultValue = "100", + sensitive = false, + help = "defaultInt" + ) + protected int defaultInt; + + @FieldDoc( + required = false, + defaultValue = "100", + sensitive = false, + help = "defaultLong" + ) + protected long defaultLong; + + @FieldDoc( + required = false, + defaultValue = "100.12", + sensitive = false, + help = "defaultDouble" + ) + protected double defaultDouble; + + @FieldDoc( + required = false, + defaultValue = "100.10", + sensitive = false, + help = "defaultFloat" + ) + protected float defaultFloat; + + @FieldDoc( + required = false, + defaultValue = "", + sensitive = false, + help = "noDefault" + ) + protected int noDefault; + } + + @Data + static class TestConfig { + @FieldDoc( + required = false, + defaultValue = "", + sensitive = true, help = "password" ) protected String password; @@ -63,7 +131,7 @@ public class IOConfigUtilsTest { * Non-string secrets are not supported at this moment */ @FieldDoc( - required = true, + required = false, defaultValue = "", sensitive = true, help = "" @@ -211,6 +279,28 @@ public class IOConfigUtilsTest { } } + @Test + public void testDefaultValue() { + + // test required field. + Assert.expectThrows(IllegalArgumentException.class, + () -> IOConfigUtils.loadWithSecrets(new HashMap<>(), TestDefaultConfig.class, new TestSinkContext())); + + + // test all default value. + Map<String, Object> configMap = new HashMap<>(); + configMap.put("testRequired", "test"); + TestDefaultConfig testDefaultConfig = + IOConfigUtils.loadWithSecrets(configMap, TestDefaultConfig.class, new TestSinkContext()); + Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr"); + Assert.assertEquals(testDefaultConfig.isDefaultBool(), true); + Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100); + Assert.assertEquals(testDefaultConfig.getDefaultLong(), 100); + Assert.assertEquals(testDefaultConfig.getDefaultDouble(), 100.12,0.00001); + Assert.assertEquals(testDefaultConfig.getDefaultFloat(), 100.10,0.00001); + Assert.assertEquals(testDefaultConfig.getNoDefault(), 0); + } + @Test public void testSourceLoadWithSecrets() { Map<String, Object> configMap = new HashMap<>(); diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index abaf2aba3e7..3d70a2204b4 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -35,7 +35,7 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable private static final long serialVersionUID = 1L; @FieldDoc( - required = true, + required = false, defaultValue = "ONLY_RAW_PAYLOAD", help = "Message format in which kinesis sink converts pulsar messages and publishes to kinesis streams.\n" + " #\n"
