This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 77c9a5a579dc2d19665569b19874d33c76f1e9a9 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Tue Feb 25 14:51:51 2020 -0800 Adding the configuration changes as per spec --- pom.xml | 9 +-- .../apache/geode/kafka/GeodeConnectorConfig.java | 66 ++++++++++++---- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 33 +++++--- .../kafka/source/GeodeSourceConnectorConfig.java | 88 +++++++++++++++++----- 4 files changed, 145 insertions(+), 51 deletions(-) diff --git a/pom.xml b/pom.xml index b6dc832..6c2676b 100644 --- a/pom.xml +++ b/pom.xml @@ -227,17 +227,10 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-plugin.version}</version> + <inherited>true</inherited> <configuration> <source>1.8</source> <target>1.8</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>${maven-plugin.version}</version> - <inherited>true</inherited> - <configuration> <compilerArgs> <arg>-Xlint:-processing</arg> <arg>-Xlint:all</arg> diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java index 717fef6..05dd5f2 100644 --- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java @@ -25,6 +25,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.storage.StringConverter; +import org.apache.geode.annotations.VisibleForTesting; + public class GeodeConnectorConfig extends AbstractConfig { // GeodeKafka Specific Configuration @@ -35,13 +37,24 @@ public class GeodeConnectorConfig extends AbstractConfig { /** * Specifies which Locators to connect to Apache Geode */ - public static final String LOCATORS = "locators"; - public static final String DEFAULT_LOCATOR = "localhost[10334]"; - public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init"; + private static final String LOCATORS = "locators"; + private static final String DEFAULT_LOCATOR = "localhost[10334]"; + private static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init"; private static final String DEFAULT_SECURITY_AUTH_INIT = "org.apache.geode.kafka.security.SystemPropertyAuthInit"; - public static final String SECURITY_USER = "security-username"; - public static final String SECURITY_PASSWORD = "security-password"; + private static final String SECURITY_USER = "security-username"; + private static final String SECURITY_PASSWORD = "security-password"; + private static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task"; + private static final String + LOCATORS_DOCUMENTATION = + "A comma separated string of locators that configure which locators to connect to"; + private static final String + SECURITY_USER_DOCUMENTATION = + "Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user"; + private static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode"; + private static final String + SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION = + "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)"; public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName(); public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName(); @@ -52,7 +65,7 @@ public class GeodeConnectorConfig extends AbstractConfig { private String securityUserName; private String securityPassword; - // Just for testing + @VisibleForTesting protected GeodeConnectorConfig() { super(new ConfigDef(), new HashMap<>()); taskId = 0; @@ -75,17 +88,38 @@ public class GeodeConnectorConfig extends AbstractConfig { protected static ConfigDef configurables() { ConfigDef configDef = new ConfigDef(); - configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM, - "Internally used to identify each task"); - configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, - "A comma separated string of locators that configure which locators to connect to"); - configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, - "Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user"); - configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, - "Supply a password to be used to authenticate with Geode"); - configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, + configDef.define( + TASK_ID, + ConfigDef.Type.INT, + "0", + ConfigDef.Importance.MEDIUM, + TASK_ID_DOCUMENTATION); + configDef.define( + LOCATORS, + ConfigDef.Type.STRING, + DEFAULT_LOCATOR, + ConfigDef.Importance.HIGH, + LOCATORS_DOCUMENTATION); + configDef.define( + SECURITY_USER, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + SECURITY_USER_DOCUMENTATION); + + configDef.define( + SECURITY_PASSWORD, + ConfigDef.Type.PASSWORD, + null, + ConfigDef.Importance.HIGH, + SECURITY_PASSWORD_DOCUMENTATION); + + configDef.define( + SECURITY_CLIENT_AUTH_INIT, + ConfigDef.Type.PASSWORD, + null, ConfigDef.Importance.HIGH, - "Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)"); + SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION); return configDef; } diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java index cd49778..46cd6c0 100644 --- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -25,10 +25,16 @@ public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { public static final ConfigDef SINK_CONFIG_DEF = configurables(); // Used by sink - public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions"; - public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]"; - public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove"; - public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true"; + private static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions"; + private static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]"; + private static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove"; + private static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true"; + private static final String + NULL_VALUES_MEAN_REMOVE_DOCUMENTATION = + "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region"; + private static final String + TOPIC_TO_REGION_BINDINGS_DOCUMENTATION = + "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]"; private final Map<String, List<String>> topicToRegions; private final boolean nullValuesMeanRemove; @@ -41,12 +47,19 @@ public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { protected static ConfigDef configurables() { ConfigDef configDef = GeodeConnectorConfig.configurables(); - configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, - DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, - "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]"); - configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, - DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, - "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region"); + configDef.define( + TOPIC_TO_REGION_BINDINGS, + ConfigDef.Type.STRING, + DEFAULT_TOPIC_TO_REGION_BINDING, + ConfigDef.Importance.HIGH, + TOPIC_TO_REGION_BINDINGS_DOCUMENTATION); + + configDef.define( + NULL_VALUES_MEAN_REMOVE, + ConfigDef.Type.BOOLEAN, + DEFAULT_NULL_VALUES_MEAN_REMOVE, + ConfigDef.Importance.MEDIUM, + NULL_VALUES_MEAN_REMOVE_DOCUMENTATION); return configDef; } diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java index 01294c9..05d4fd6 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -53,6 +53,27 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { public static final String LOAD_ENTIRE_REGION = "load-entire-region"; public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; + private static final String + CQS_TO_REGISTER_DOCUMENTATION = + "Internally created and used parameter, for signalling a task to register CQs on Apache Geode"; + private static final String + REGION_TO_TOPIC_BINDINGS_DOCUMENTATION = + "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\""; + private static final String + DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION = + "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client"; + private static final String + LOAD_ENTIRE_REGION_DOCUMENTATION = + "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a CQ"; + private static final String + DURABLE_CLIENT_TIME_OUT_DOCUMENTATION = + "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated"; + private static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server"; + private static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll"; + private static final String + QUEUE_SIZE_DOCUMENTATION = + "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue "; + private final String durableClientId; private final String durableClientTimeout; private final String cqPrefix; @@ -82,27 +103,60 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { protected static ConfigDef configurables() { ConfigDef configDef = GeodeConnectorConfig.configurables(); - configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, - "Internally created and used parameter, for signalling a task to register cqs"); - configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, - DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, - "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\""); - configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, + + configDef.define( + CQS_TO_REGISTER, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + CQS_TO_REGISTER_DOCUMENTATION); + + configDef.define( + REGION_TO_TOPIC_BINDINGS, + ConfigDef.Type.STRING, + DEFAULT_REGION_TO_TOPIC_BINDING, + ConfigDef.Importance.HIGH, + REGION_TO_TOPIC_BINDINGS_DOCUMENTATION); + + configDef.define( + DURABLE_CLIENT_ID_PREFIX, + ConfigDef.Type.STRING, + DEFAULT_DURABLE_CLIENT_ID, ConfigDef.Importance.LOW, - "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client"); - configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, + DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION); + + configDef.define(DURABLE_CLIENT_TIME_OUT, + ConfigDef.Type.STRING, + DEFAULT_DURABLE_CLIENT_TIMEOUT, + ConfigDef.Importance.LOW, + DURABLE_CLIENT_TIME_OUT_DOCUMENTATION); + + configDef.define(CQ_PREFIX, + ConfigDef.Type.STRING, + DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, - "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated"); - configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, - "Prefix string to identify Connector cq's on a Geode server"); - configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, - ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll"); - configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, + CQ_PREFIX_DOCUMENTATION); + + configDef.define( + BATCH_SIZE, + ConfigDef.Type.INT, + DEFAULT_BATCH_SIZE, ConfigDef.Importance.MEDIUM, - "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue "); - configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, + BATCH_SIZE_DOCUMENTATION); + + configDef.define( + QUEUE_SIZE, + ConfigDef.Type.INT, + DEFAULT_QUEUE_SIZE, ConfigDef.Importance.MEDIUM, - "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq"); + QUEUE_SIZE_DOCUMENTATION); + + configDef.define(LOAD_ENTIRE_REGION, + ConfigDef.Type.BOOLEAN, + DEFAULT_LOAD_ENTIRE_REGION, + ConfigDef.Importance.MEDIUM, + LOAD_ENTIRE_REGION_DOCUMENTATION); + return configDef; }