This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e915b8e  Extract config validation class into separate module (#7734)
e915b8e is described below

commit e915b8e146f4cb735a316fcfebddf3162a149367
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue Aug 4 10:52:46 2020 -0700

    Extract config validation class into separate module (#7734)
    
    * Extract config validation class into separate module
    
    * fix tests
    
    * adding example
    
    * fixing
    
    Co-authored-by: Jerry Peng <[email protected]>
---
 conf/functions_worker.yml                          |  3 ++
 pom.xml                                            |  3 +-
 pulsar-config-validation/pom.xml                   | 42 +++++++++++++++
 .../config/validation}/ConfigValidation.java       | 39 +++++++++-----
 .../validation}/ConfigValidationAnnotations.java   | 11 +---
 .../config/validation}/ConfigValidationUtils.java  |  2 +-
 .../pulsar/config/validation}/Validator.java       |  2 +-
 .../pulsar/config/validation}/ValidatorImpls.java  | 38 ++++----------
 .../pulsar/config/validation}/package-info.java    |  2 +-
 .../config/validation}/ConfigValidationTest.java   | 22 +++-----
 .../config/validation}/ValidatorImplsTest.java     | 13 +----
 pulsar-functions/utils/pom.xml                     |  6 +++
 .../pulsar/functions/utils/SinkConfigUtils.java    |  2 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  2 +-
 .../functions/utils/SinkConfigUtilsTest.java       |  2 +-
 .../functions/utils/SourceConfigUtilsTest.java     |  3 +-
 pulsar-io/data-generator/pom.xml                   |  7 +++
 .../io/datagenerator/DataGeneratorSource.java      |  7 +--
 ...rSource.java => DataGeneratorSourceConfig.java} | 61 ++++++++++------------
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 20 files changed, 146 insertions(+), 122 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 34cf09a..cf01344 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -235,3 +235,6 @@ tlsCertRefreshCheckDurationSec: 300
 
 connectorsDirectory: ./connectors
 functionsDirectory: ./functions
+
+# Should connector config be validated during during submission
+validateConnectorConfig: false
diff --git a/pom.xml b/pom.xml
index 14c3b15..d817636 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,6 @@
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
-
   <parent>
     <groupId>org.apache</groupId>
     <artifactId>apache</artifactId>
@@ -1687,6 +1686,7 @@ flexible messaging model and an intuitive client 
API.</description>
         <module>dashboard</module>
         <module>pulsar-broker-auth-sasl</module>
         <module>pulsar-client-auth-sasl</module>
+        <module>pulsar-config-validation</module>
 
         <!-- transaction related modules -->
         <module>pulsar-transaction</module>
@@ -1740,6 +1740,7 @@ flexible messaging model and an intuitive client 
API.</description>
         <module>pulsar-zookeeper</module>
         <module>pulsar-broker-auth-sasl</module>
         <module>pulsar-client-auth-sasl</module>
+        <module>pulsar-config-validation</module>
 
         <!-- transaction related modules -->
         <module>pulsar-transaction</module>
diff --git a/pulsar-config-validation/pom.xml b/pulsar-config-validation/pom.xml
new file mode 100644
index 0000000..4b056a8
--- /dev/null
+++ b/pulsar-config-validation/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+        xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar</artifactId>
+        <version>2.7.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>pulsar-config-validation</artifactId>
+    <description>Annotation based config validation for Pulsar</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java
 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidation.java
similarity index 80%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java
rename to 
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidation.java
index dc607ab..57da500 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java
+++ 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidation.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
+
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
@@ -25,15 +26,19 @@ import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 
-import lombok.extern.slf4j.Slf4j;
-
 /**
  * The class that does the validation of all the members of a given object.
  */
-@Slf4j
 public class ConfigValidation {
 
-    public static void validateConfig(Object config) {
+    private static final Class DEFAULT_ANNOTATION_CLASS = 
ConfigValidationAnnotations.class;
+
+    /**
+     * Validate the config object with annotations from annotationClass
+     * @param config config object
+     * @param annotationClass class with annotations to use
+     */
+    public static void validateConfig(Object config, Class annotationClass) {
         for (Field field : config.getClass().getDeclaredFields()) {
             Object value = null;
             field.setAccessible(true);
@@ -42,25 +47,33 @@ public class ConfigValidation {
             } catch (IllegalAccessException e) {
                 throw new RuntimeException(e);
             }
-            validateField(field, value);
+            validateField(field, value, annotationClass);
         }
-        validateClass(config);
+        validateClass(config, annotationClass);
+    }
+
+    /**
+     * Validate the config object with default annotation class
+     * @param config config object
+     */
+    public static void validateConfig(Object config) {
+        validateConfig(config, DEFAULT_ANNOTATION_CLASS);
     }
 
-    private static void validateClass(Object config) {
-        processAnnotations(config.getClass().getAnnotations(), 
config.getClass().getName(), config);
+    private static void validateClass(Object config, Class annotationClass) {
+        processAnnotations(config.getClass().getAnnotations(), 
config.getClass().getName(), config, annotationClass);
     }
 
-    private static void validateField(Field field, Object value) {
-        processAnnotations(field.getAnnotations(), field.getName(), value);
+    private static void validateField(Field field, Object value, Class 
annotationClass) {
+        processAnnotations(field.getAnnotations(), field.getName(), value, 
annotationClass);
     }
 
-    private static void processAnnotations(Annotation[] annotations, String 
fieldName, Object value) {
+    private static void processAnnotations(Annotation[] annotations, String 
fieldName, Object value, Class annotationClass) {
         try {
             for (Annotation annotation : annotations) {
                 String type = annotation.annotationType().getName();
                 Class<?> validatorClass = null;
-                Class<?>[] classes = 
ConfigValidationAnnotations.class.getDeclaredClasses();
+                Class<?>[] classes = annotationClass.getDeclaredClasses();
                 //check if annotation is one of our
                 for (Class<?> clazz : classes) {
                     if (clazz.getName().equals(type)) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java
 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationAnnotations.java
similarity index 94%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java
rename to 
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationAnnotations.java
index 1e95430..aa7c3c2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java
+++ 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationAnnotations.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
@@ -132,15 +132,6 @@ public class ConfigValidationAnnotations {
     }
 
     /**
-     * checks if the topic name is valid.
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface TopicName {
-        Class<?> validatorClass() default 
ValidatorImpls.TopicNameValidator.class;
-    }
-
-    /**
      * Field names for annotations.
      */
     public static class ValidatorParams {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java
 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationUtils.java
similarity index 99%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java
rename to 
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationUtils.java
index 2add624..ee393e8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java
+++ 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ConfigValidationUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
 
 import java.util.Map;
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/Validator.java
similarity index 96%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java
rename to 
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/Validator.java
index 3e78a18..77032e3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java
+++ 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/Validator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
 
 import java.util.Map;
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java
 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ValidatorImpls.java
similarity index 94%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java
rename to 
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ValidatorImpls.java
index d549329..23cb924 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java
+++ 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/ValidatorImpls.java
@@ -16,22 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.naming.TopicName;
-
 /**
  * System defined Validator Annotations.
  */
-@Slf4j
 public class ValidatorImpls {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ValidatorImpls.class);
+
     /**
      * Validates a positive number.
      */
@@ -272,9 +273,12 @@ public class ValidatorImpls {
     /**
      * validates that the string is equal to one of the specified ones in the 
list.
      */
-    @NoArgsConstructor
     public static class StringValidator extends Validator {
 
+        public StringValidator() {
+
+        }
+
         private HashSet<String> acceptedValues = null;
 
         public StringValidator(Map<String, Object> params) {
@@ -350,26 +354,6 @@ public class ValidatorImpls {
     }
 
     /**
-     * Validates that the field is a valid topic name.
-     */
-    @NoArgsConstructor
-    public static class TopicNameValidator extends Validator {
-
-        @Override
-        public void validateField(String name, Object o) {
-            if (o == null) {
-                return;
-            }
-            new StringValidator().validateField(name, o);
-            String topic = (String) o;
-            if (!TopicName.isValid(topic)) {
-                throw new IllegalArgumentException(
-                        String.format("The topic name %s is invalid for field 
'%s'", topic, name));
-            }
-        }
-    }
-
-    /**
      * Validates basic types.
      */
     public static class SimpleTypeValidator extends Validator {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/package-info.java
 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/package-info.java
similarity index 95%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/validator/package-info.java
rename to 
pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/package-info.java
index cff3af6..adbc737 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/validator/package-info.java
+++ 
b/pulsar-config-validation/src/main/java/org/apache/pulsar/config/validation/package-info.java
@@ -19,4 +19,4 @@
 /**
  * Implementation of Validator interfaces and annotations.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java
 
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ConfigValidationTest.java
similarity index 90%
rename from 
pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java
rename to 
pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ConfigValidationTest.java
index cd7c3bd..085ddb6 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java
+++ 
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ConfigValidationTest.java
@@ -16,12 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
 
 import org.testng.annotations.Test;
 
-import java.util.*;
-import static org.testng.Assert.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 
 public class ConfigValidationTest {
 
@@ -66,8 +71,6 @@ public class ConfigValidationTest {
         public Map stringIntegerMap;
         @ConfigValidationAnnotations.StringList
         public List stringList;
-        @ConfigValidationAnnotations.TopicName
-        public String topic;
         @ConfigValidationAnnotations.CustomType(validatorClass = 
TestValidator.class)
         public String customString;
     }
@@ -119,14 +122,6 @@ public class ConfigValidationTest {
     }
 
     @Test
-    public void testTopicName() {
-        TestConfig testConfig = createGoodConfig();
-        testConfig.topic = "http://google.com";;
-        Exception e = expectThrows(IllegalArgumentException.class, () -> 
ConfigValidation.validateConfig(testConfig));
-        assertTrue(e.getMessage().contains("topic"));
-    }
-
-    @Test
     public void testCustomString() {
         TestConfig testConfig = createGoodConfig();
         testConfig.customString = "http://google.com";;
@@ -141,7 +136,6 @@ public class ConfigValidationTest {
         testConfig.integerList = testIntegerList;
         testConfig.stringIntegerMap = testStringIntegerMap;
         testConfig.stringList = testStringList;
-        testConfig.topic = topic;
         testConfig.customString = "ABCDEabcde";
         return testConfig;
     }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
 
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ValidatorImplsTest.java
similarity index 91%
rename from 
pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
rename to 
pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ValidatorImplsTest.java
index 474757b..be53631 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
+++ 
b/pulsar-config-validation/src/test/java/org/apache/pulsar/config/validation/ValidatorImplsTest.java
@@ -16,9 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.validator;
+package org.apache.pulsar.config.validation;
 
-import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.Test;
 
 import java.net.InetSocketAddress;
@@ -60,7 +59,6 @@ public class ValidatorImplsTest {
         ValidatorImpls.NotNullValidator validator = new 
ValidatorImpls.NotNullValidator();
         validator.validateField("fieldname", 2);
         validator.validateField("fieldname", "Something");
-        validator.validateField("fieldname", TopicName.get("default"));
         assertThrows(IllegalArgumentException.class, () -> 
validator.validateField("field", null));
     }
 
@@ -141,15 +139,6 @@ public class ValidatorImplsTest {
     }
 
     @Test
-    public void testTopicNameValidator() {
-        ValidatorImpls.TopicNameValidator validator = new 
ValidatorImpls.TopicNameValidator();
-        validator.validateField("fieldname", "topicname");
-        validator.validateField("fieldname", "public/default/topicname");
-        validator.validateField("fieldname", 
"persistent://public/default/topicname");
-        assertThrows(IllegalArgumentException.class, () -> 
validator.validateField("fieldname", "http://google.com";));
-    }
-
-    @Test
     public void testSimpleTypeValidator() {
         Map<String, Object> config = new HashMap<>();
         config.put(ConfigValidationAnnotations.ValidatorParams.TYPE, 
Integer.class);
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 9043aa8..5b3e163 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -57,6 +57,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-config-validation</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 4a33b77..2302f4b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.common.validator.ConfigValidation;
+import org.apache.pulsar.config.validation.ConfigValidation;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index c8e0764..a246e53 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.common.validator.ConfigValidation;
+import org.apache.pulsar.config.validation.ConfigValidation;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 4363ea1..318ce2c 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
+import org.apache.pulsar.config.validation.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 7c540b3..d517026 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -28,8 +28,7 @@ import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.config.validation.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.io.core.BatchSourceTriggerer;
diff --git a/pulsar-io/data-generator/pom.xml b/pulsar-io/data-generator/pom.xml
index 5e64dd2..3af2c0f 100644
--- a/pulsar-io/data-generator/pom.xml
+++ b/pulsar-io/data-generator/pom.xml
@@ -39,10 +39,17 @@
         </dependency>
 
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-config-validation</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>io.codearte.jfairy</groupId>
             <artifactId>jfairy</artifactId>
             <version>0.5.9</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
diff --git 
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
index 118025a..9d92859 100644
--- 
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++ 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
@@ -26,19 +26,20 @@ import org.apache.pulsar.io.core.SourceContext;
 import java.util.Map;
 import java.util.Optional;
 
-
 public class DataGeneratorSource implements Source<Person> {
 
     private Fairy fairy;
+    private DataGeneratorSourceConfig dataGeneratorSourceConfig;
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-       this.fairy = Fairy.create();
+        dataGeneratorSourceConfig = 
DataGeneratorSourceConfig.loadOrGetDefault(config);
+        this.fairy = Fairy.create();
     }
 
     @Override
     public Record<Person> read() throws Exception {
-        Thread.sleep(50);
+        Thread.sleep(dataGeneratorSourceConfig.getSleepBetweenMessages());
         return new Record<Person>() {
             @Override
             public Optional<String> getKey() {
diff --git 
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSourceConfig.java
similarity index 50%
copy from 
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
copy to 
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSourceConfig.java
index 118025a..7ae9aa8 100644
--- 
a/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++ 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSourceConfig.java
@@ -18,42 +18,35 @@
  */
 package org.apache.pulsar.io.datagenerator;
 
-import io.codearte.jfairy.Fairy;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
+import static 
org.apache.pulsar.config.validation.ConfigValidationAnnotations.PositiveNumber;
 
-import java.util.Map;
-import java.util.Optional;
-
-
-public class DataGeneratorSource implements Source<Person> {
-
-    private Fairy fairy;
-
-    @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-       this.fairy = Fairy.create();
-    }
-
-    @Override
-    public Record<Person> read() throws Exception {
-        Thread.sleep(50);
-        return new Record<Person>() {
-            @Override
-            public Optional<String> getKey() {
-                return Optional.empty();
-            }
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Data;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
-            @Override
-            public Person getValue() {
-                return new Person(fairy.person());
-            }
-        };
-    }
-
-    @Override
-    public void close() throws Exception {
+import java.io.Serializable;
+import java.util.Map;
 
+@Data
+public class DataGeneratorSourceConfig implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  @FieldDoc(
+    required = true,
+    defaultValue = "50",
+    sensitive = false,
+    help = "How long to sleep between emitting messages"
+  )
+  @PositiveNumber
+  private long sleepBetweenMessages = 50;
+
+
+  public static DataGeneratorSourceConfig loadOrGetDefault(Map<String, Object> 
configMap) {
+    if (configMap.isEmpty()) {
+      return new DataGeneratorSourceConfig();
+    } else {
+      ObjectMapper mapper = new ObjectMapper();
+      return mapper.convertValue(configMap, DataGeneratorSourceConfig.class);
     }
+  }
 }
diff --git 
a/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
index 0e5d723..ce97eaa 100644
--- 
a/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,5 @@
 name: data-generator
 description: Test data generator source
 sourceClass: org.apache.pulsar.io.datagenerator.DataGeneratorSource
+sourceConfigClass: org.apache.pulsar.io.datagenerator.DataGeneratorSourceConfig
 sinkClass: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink

Reply via email to