This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e915b8e Extract config validation class into separate module (#7734)
e915b8e is described below
commit e915b8e146f4cb735a316fcfebddf3162a149367
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue Aug 4 10:52:46 2020 -0700
Extract config validation class into separate module (#7734)
* Extract config validation class into separate module
* fix tests
* adding example
* fixing
Co-authored-by: Jerry Peng <[email protected]>
---
conf/functions_worker.yml | 3 ++
pom.xml | 3 +-
pulsar-config-validation/pom.xml | 42 +++++++++++++++
.../config/validation}/ConfigValidation.java | 39 +++++++++-----
.../validation}/ConfigValidationAnnotations.java | 11 +---
.../config/validation}/ConfigValidationUtils.java | 2 +-
.../pulsar/config/validation}/Validator.java | 2 +-
.../pulsar/config/validation}/ValidatorImpls.java | 38 ++++----------
.../pulsar/config/validation}/package-info.java | 2 +-
.../config/validation}/ConfigValidationTest.java | 22 +++-----
.../config/validation}/ValidatorImplsTest.java | 13 +----
pulsar-functions/utils/pom.xml | 6 +++
.../pulsar/functions/utils/SinkConfigUtils.java | 2 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 2 +-
.../functions/utils/SinkConfigUtilsTest.java | 2 +-
.../functions/utils/SourceConfigUtilsTest.java | 3 +-
pulsar-io/data-generator/pom.xml | 7 +++
.../io/datagenerator/DataGeneratorSource.java | 7 +--
...rSource.java => DataGeneratorSourceConfig.java} | 61 ++++++++++------------
.../resources/META-INF/services/pulsar-io.yaml | 1 +
20 files changed, 146 insertions(+), 122 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 34cf09a..cf01344 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -235,3 +235,6 @@ tlsCertRefreshCheckDurationSec: 300
connectorsDirectory: ./connectors
functionsDirectory: ./functions
+
+# Should connector config be validated during during submission
+validateConnectorConfig: false
diff --git a/pom.xml b/pom.xml
index 14c3b15..d817636 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,6 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
-
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
@@ -1687,6 +1686,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>dashboard</module>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
+ <module>pulsar-config-validation</module>
<!-- transaction related modules -->
<module>pulsar-transaction</module>
@@ -1740,6 +1740,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>pulsar-zookeeper</module>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
+ <module>pulsar-config-validation</module>
<!-- transaction related modules -->
<module>pulsar-transaction</module>
diff --git a/pulsar-config-validation/pom.xml b/pulsar-config-validation/pom.xml
new file mode 100644
index 0000000..4b056a8
--- /dev/null
+++ b/pulsar-config-validation/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.7.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-config-validation</artifactId>
+ <description>Annotation based config validation for Pulsar</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidation.java
similarity index 80%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java
rename to
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidation.java
index dc607ab..57da500 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java
+++
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidation.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
+
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
@@ -25,15 +26,19 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-
/**
* The class that does the validation of all the members of a given object.
*/
-@Slf4j
public class ConfigValidation {
- public static void validateConfig(Object config) {
+ private static final Class DEFAULT_ANNOTATION_CLASS =
ConfigValidationAnnotations.class;
+
+ /**
+ * Validate the config object with annotations from annotationClass
+ * @param config config object
+ * @param annotationClass class with annotations to use
+ */
+ public static void validateConfig(Object config, Class annotationClass) {
for (Field field : config.getClass().getDeclaredFields()) {
Object value = null;
field.setAccessible(true);
@@ -42,25 +47,33 @@ public class ConfigValidation {
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
- validateField(field, value);
+ validateField(field, value, annotationClass);
}
- validateClass(config);
+ validateClass(config, annotationClass);
+ }
+
+ /**
+ * Validate the config object with default annotation class
+ * @param config config object
+ */
+ public static void validateConfig(Object config) {
+ validateConfig(config, DEFAULT_ANNOTATION_CLASS);
}
- private static void validateClass(Object config) {
- processAnnotations(config.getClass().getAnnotations(),
config.getClass().getName(), config);
+ private static void validateClass(Object config, Class annotationClass) {
+ processAnnotations(config.getClass().getAnnotations(),
config.getClass().getName(), config, annotationClass);
}
- private static void validateField(Field field, Object value) {
- processAnnotations(field.getAnnotations(), field.getName(), value);
+ private static void validateField(Field field, Object value, Class
annotationClass) {
+ processAnnotations(field.getAnnotations(), field.getName(), value,
annotationClass);
}
- private static void processAnnotations(Annotation[] annotations, String
fieldName, Object value) {
+ private static void processAnnotations(Annotation[] annotations, String
fieldName, Object value, Class annotationClass) {
try {
for (Annotation annotation : annotations) {
String type = annotation.annotationType().getName();
Class<?> validatorClass = null;
- Class<?>[] classes =
ConfigValidationAnnotations.class.getDeclaredClasses();
+ Class<?>[] classes = annotationClass.getDeclaredClasses();
//check if annotation is one of our
for (Class<?> clazz : classes) {
if (clazz.getName().equals(type)) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationAnnotations.java
similarity index 94%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java
rename to
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationAnnotations.java
index 1e95430..aa7c3c2 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java
+++
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationAnnotations.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -132,15 +132,6 @@ public class ConfigValidationAnnotations {
}
/**
- * checks if the topic name is valid.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.FIELD)
- public @interface TopicName {
- Class<?> validatorClass() default
ValidatorImpls.TopicNameValidator.class;
- }
-
- /**
* Field names for annotations.
*/
public static class ValidatorParams {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationUtils.java
similarity index 99%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java
rename to
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationUtils.java
index 2add624..ee393e8 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java
+++
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
import java.util.Map;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/Validator.java
similarity index 96%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java
rename to
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/Validator.java
index 3e78a18..77032e3 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java
+++
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/Validator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
import java.util.Map;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ValidatorImpls.java
similarity index 94%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java
rename to
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ValidatorImpls.java
index d549329..23cb924 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java
+++
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ValidatorImpls.java
@@ -16,22 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.naming.TopicName;
-
/**
* System defined Validator Annotations.
*/
-@Slf4j
public class ValidatorImpls {
+
+ private static final Logger log =
LoggerFactory.getLogger(ValidatorImpls.class);
+
/**
* Validates a positive number.
*/
@@ -272,9 +273,12 @@ public class ValidatorImpls {
/**
* validates that the string is equal to one of the specified ones in the
list.
*/
- @NoArgsConstructor
public static class StringValidator extends Validator {
+ public StringValidator() {
+
+ }
+
private HashSet<String> acceptedValues = null;
public StringValidator(Map<String, Object> params) {
@@ -350,26 +354,6 @@ public class ValidatorImpls {
}
/**
- * Validates that the field is a valid topic name.
- */
- @NoArgsConstructor
- public static class TopicNameValidator extends Validator {
-
- @Override
- public void validateField(String name, Object o) {
- if (o == null) {
- return;
- }
- new StringValidator().validateField(name, o);
- String topic = (String) o;
- if (!TopicName.isValid(topic)) {
- throw new IllegalArgumentException(
- String.format("The topic name %s is invalid for field
'%s'", topic, name));
- }
- }
- }
-
- /**
* Validates basic types.
*/
public static class SimpleTypeValidator extends Validator {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/package-info.java
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/package-info.java
similarity index 95%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/validator/package-info.java
rename to
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/package-info.java
index cff3af6..adbc737 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/package-info.java
+++
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/package-info.java
@@ -19,4 +19,4 @@
/**
* Implementation of Validator interfaces and annotations.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ConfigValidationTest.java
similarity index 90%
rename from
pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java
rename to
pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ConfigValidationTest.java
index cd7c3bd..085ddb6 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java
+++
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ConfigValidationTest.java
@@ -16,12 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
import org.testng.annotations.Test;
-import java.util.*;
-import static org.testng.Assert.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
public class ConfigValidationTest {
@@ -66,8 +71,6 @@ public class ConfigValidationTest {
public Map stringIntegerMap;
@ConfigValidationAnnotations.StringList
public List stringList;
- @ConfigValidationAnnotations.TopicName
- public String topic;
@ConfigValidationAnnotations.CustomType(validatorClass =
TestValidator.class)
public String customString;
}
@@ -119,14 +122,6 @@ public class ConfigValidationTest {
}
@Test
- public void testTopicName() {
- TestConfig testConfig = createGoodConfig();
- testConfig.topic = "http://google.com";
- Exception e = expectThrows(IllegalArgumentException.class, () ->
ConfigValidation.validateConfig(testConfig));
- assertTrue(e.getMessage().contains("topic"));
- }
-
- @Test
public void testCustomString() {
TestConfig testConfig = createGoodConfig();
testConfig.customString = "http://google.com";
@@ -141,7 +136,6 @@ public class ConfigValidationTest {
testConfig.integerList = testIntegerList;
testConfig.stringIntegerMap = testStringIntegerMap;
testConfig.stringList = testStringList;
- testConfig.topic = topic;
testConfig.customString = "ABCDEabcde";
return testConfig;
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ValidatorImplsTest.java
similarity index 91%
rename from
pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
rename to
pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ValidatorImplsTest.java
index 474757b..be53631 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
+++
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ValidatorImplsTest.java
@@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
-import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.Test;
import java.net.InetSocketAddress;
@@ -60,7 +59,6 @@ public class ValidatorImplsTest {
ValidatorImpls.NotNullValidator validator = new
ValidatorImpls.NotNullValidator();
validator.validateField("fieldname", 2);
validator.validateField("fieldname", "Something");
- validator.validateField("fieldname", TopicName.get("default"));
assertThrows(IllegalArgumentException.class, () ->
validator.validateField("field", null));
}
@@ -141,15 +139,6 @@ public class ValidatorImplsTest {
}
@Test
- public void testTopicNameValidator() {
- ValidatorImpls.TopicNameValidator validator = new
ValidatorImpls.TopicNameValidator();
- validator.validateField("fieldname", "topicname");
- validator.validateField("fieldname", "public/default/topicname");
- validator.validateField("fieldname",
"persistent://public/default/topicname");
- assertThrows(IllegalArgumentException.class, () ->
validator.validateField("fieldname", "http://google.com"));
- }
-
- @Test
public void testSimpleTypeValidator() {
Map<String, Object> config = new HashMap<>();
config.put(ConfigValidationAnnotations.ValidatorParams.TYPE,
Integer.class);
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 9043aa8..5b3e163 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -57,6 +57,12 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-config-validation</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 4a33b77..2302f4b 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.common.validator.ConfigValidation;
+import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index c8e0764..a246e53 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.common.validator.ConfigValidation;
+import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 4363ea1..318ce2c 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
+import org.apache.pulsar.config.validation.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 7c540b3..d517026 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -28,8 +28,7 @@ import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.config.validation.ConfigValidationAnnotations;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
diff --git a/pulsar-io/data-generator/pom.xml b/pulsar-io/data-generator/pom.xml
index 5e64dd2..3af2c0f 100644
--- a/pulsar-io/data-generator/pom.xml
+++ b/pulsar-io/data-generator/pom.xml
@@ -39,10 +39,17 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-config-validation</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.codearte.jfairy</groupId>
<artifactId>jfairy</artifactId>
<version>0.5.9</version>
</dependency>
+
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
diff --git
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
index 118025a..9d92859 100644
---
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
@@ -26,19 +26,20 @@ import org.apache.pulsar.io.core.SourceContext;
import java.util.Map;
import java.util.Optional;
-
public class DataGeneratorSource implements Source<Person> {
private Fairy fairy;
+ private DataGeneratorSourceConfig dataGeneratorSourceConfig;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- this.fairy = Fairy.create();
+ dataGeneratorSourceConfig =
DataGeneratorSourceConfig.loadOrGetDefault(config);
+ this.fairy = Fairy.create();
}
@Override
public Record<Person> read() throws Exception {
- Thread.sleep(50);
+ Thread.sleep(dataGeneratorSourceConfig.getSleepBetweenMessages());
return new Record<Person>() {
@Override
public Optional<String> getKey() {
diff --git
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSourceConfig.java
similarity index 50%
copy from
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
copy to
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSourceConfig.java
index 118025a..7ae9aa8 100644
---
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSourceConfig.java
@@ -18,42 +18,35 @@
*/
package org.apache.pulsar.io.datagenerator;
-import io.codearte.jfairy.Fairy;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
+import static
org.apache.pulsar.config.validation.ConfigValidationAnnotations.PositiveNumber;
-import java.util.Map;
-import java.util.Optional;
-
-
-public class DataGeneratorSource implements Source<Person> {
-
- private Fairy fairy;
-
- @Override
- public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- this.fairy = Fairy.create();
- }
-
- @Override
- public Record<Person> read() throws Exception {
- Thread.sleep(50);
- return new Record<Person>() {
- @Override
- public Optional<String> getKey() {
- return Optional.empty();
- }
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Data;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
- @Override
- public Person getValue() {
- return new Person(fairy.person());
- }
- };
- }
-
- @Override
- public void close() throws Exception {
+import java.io.Serializable;
+import java.util.Map;
+@Data
+public class DataGeneratorSourceConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "50",
+ sensitive = false,
+ help = "How long to sleep between emitting messages"
+ )
+ @PositiveNumber
+ private long sleepBetweenMessages = 50;
+
+
+ public static DataGeneratorSourceConfig loadOrGetDefault(Map<String, Object>
configMap) {
+ if (configMap.isEmpty()) {
+ return new DataGeneratorSourceConfig();
+ } else {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.convertValue(configMap, DataGeneratorSourceConfig.class);
}
+ }
}
diff --git
a/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
index 0e5d723..ce97eaa 100644
---
a/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
+++
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,5 @@
name: data-generator
description: Test data generator source
sourceClass: org.apache.pulsar.io.datagenerator.DataGeneratorSource
+sourceConfigClass: org.apache.pulsar.io.datagenerator.DataGeneratorSourceConfig
sinkClass: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink