This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 63fd78f0dd0dc683b2235247511e3377b0809150 Author: Jason Huynh <[email protected]> AuthorDate: Fri Jan 31 16:42:01 2020 -0800 Removed system outs Passing through the security auth init property --- src/main/java/geode/kafka/GeodeConnectorConfig.java | 7 +++++++ src/main/java/geode/kafka/GeodeContext.java | 16 +++++++++++----- src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java | 3 +-- .../java/geode/kafka/source/GeodeKafkaSourceTask.java | 6 +----- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java index 88f5a02..ac9a31f 100644 --- a/src/main/java/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java @@ -32,9 +32,11 @@ public class GeodeConnectorConfig { */ public static final String LOCATORS = "locators"; public static final String DEFAULT_LOCATOR = "localhost[10334]"; + public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init"; protected final int taskId; protected List<LocatorHostPort> locatorHostPorts; + private String securityClientAuthInit; protected GeodeConnectorConfig() { taskId = 0; @@ -43,6 +45,7 @@ public class GeodeConnectorConfig { public GeodeConnectorConfig(Map<String, String> connectorProperties) { taskId = Integer.parseInt(connectorProperties.get(TASK_ID)); locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS)); + securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT); } @@ -116,4 +119,8 @@ public class GeodeConnectorConfig { public List<LocatorHostPort> getLocatorHostPorts() { return locatorHostPorts; } + + public String getSecurityClientAuthInit() { + return securityClientAuthInit; + } } diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java index 2078782..ff8a8c3 100644 --- a/src/main/java/geode/kafka/GeodeContext.java +++ b/src/main/java/geode/kafka/GeodeContext.java @@ -26,6 +26,8 @@ import org.apache.kafka.connect.errors.ConnectException; import java.util.Collection; import java.util.List; +import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT; + public class GeodeContext { private ClientCache clientCache; @@ -34,13 +36,13 @@ public class GeodeContext { public GeodeContext() { } - public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout) { - clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout); + public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout, String securityAuthInit) { + clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit); return clientCache; } - public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList) { - clientCache = createClientCache(locatorHostPortList, "", ""); + public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String securityAuthInit) { + clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit); return clientCache; } @@ -55,8 +57,12 @@ public class GeodeContext { * @param durableClientTimeOut * @return */ - public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) { + public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut, String securityAuthInit) { ClientCacheFactory ccf = new ClientCacheFactory(); + + if (securityAuthInit != null) { + ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit); + } if (!durableClientName.equals("")) { ccf.set("durable-client-id", durableClientName) .set("durable-client-timeout", durableClientTimeOut); diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java index a1d7ecf..66e528f 100644 --- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -58,12 +58,11 @@ public class GeodeKafkaSinkTask extends SinkTask { GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); geodeContext = new GeodeContext(); - geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts()); + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getSecurityClientAuthInit()); topicToRegions = geodeConnectorConfig.getTopicToRegions(); regionNameToRegion = createProxyRegions(topicToRegions.values()); nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove(); } catch (Exception e) { - e.printStackTrace(); logger.error("Unable to start sink task", e); throw e; } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java index 0f6b50c..8d827a1 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -68,11 +68,10 @@ public class GeodeKafkaSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { try { - System.out.println("JASON start task"); geodeConnectorConfig = new GeodeSourceConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); geodeContext = new GeodeContext(); - geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout()); + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit()); batchSize = Integer.parseInt(props.get(BATCH_SIZE)); int queueSize = Integer.parseInt(props.get(QUEUE_SIZE)); @@ -86,12 +85,9 @@ public class GeodeKafkaSourceTask extends SourceTask { boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion(); installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion); } catch (Exception e) { - System.out.println("JASON start task failed" + e); - e.printStackTrace(); logger.error("Unable to start source task", e); throw e; } - System.out.println("JASON end task"); }
