This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3515ae7  Converting global variables to local
3515ae7 is described below

commit 3515ae72c7f25399120bd07f0fa2ac74bb4895d5
Author: Naburun Nag <n...@cs.wisc.edu>
AuthorDate: Wed Feb 19 09:03:10 2020 -0800

    Converting global variables to local
---
 .../org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java  |  3 +--
 .../apache/geode/kafka/source/GeodeKafkaSourceTask.java  |  3 +--
 .../geode/kafka/source/GeodeSourceConnectorConfig.java   |  3 +--
 .../org/apache/geode/kafka/GeodeAsSinkDUnitTest.java     | 16 ++++++----------
 .../org/apache/geode/kafka/GeodeAsSourceDUnitTest.java   | 16 ++++++----------
 5 files changed, 15 insertions(+), 26 deletions(-)

diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java 
b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index eaf0f66..daf2274 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -40,7 +40,6 @@ public class GeodeKafkaSinkTask extends SinkTask {
   private static final Logger logger = 
LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
 
   private GeodeContext geodeContext;
-  private int taskId;
   private Map<String, List<String>> topicToRegions;
   private Map<String, Region> regionNameToRegion;
   private boolean nullValuesMeansRemove = true;
@@ -73,7 +72,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
     logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() 
+ " starting");
-    taskId = geodeConnectorConfig.getTaskId();
+    int taskId = geodeConnectorConfig.getTaskId();
     topicToRegions = geodeConnectorConfig.getTopicToRegions();
     nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
   }
diff --git 
a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 4e5b415..2d5abe4 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -43,7 +43,6 @@ public class GeodeKafkaSourceTask extends SourceTask {
   private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
   private GeodeContext geodeContext;
-  private GeodeSourceConnectorConfig geodeConnectorConfig;
   private EventBufferSupplier eventBufferSupplier;
   private Map<String, List<String>> regionToTopics;
   private Map<String, Map<String, String>> sourcePartitions;
@@ -64,7 +63,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
   @Override
   public void start(Map<String, String> props) {
     try {
-      geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
+      GeodeSourceConnectorConfig geodeConnectorConfig = new 
GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + 
geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
diff --git 
a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index ac70051..a004f23 100644
--- 
a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ 
b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -54,7 +54,6 @@ public class GeodeSourceConnectorConfig extends 
GeodeConnectorConfig {
   public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
   private final String durableClientId;
-  private final String durableClientIdPrefix;
   private final String durableClientTimeout;
   private final String cqPrefix;
   private final boolean loadEntireRegion;
@@ -68,7 +67,7 @@ public class GeodeSourceConnectorConfig extends 
GeodeConnectorConfig {
     super(SOURCE_CONFIG_DEF, connectorProperties);
     cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
     regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
-    durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
+    String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
     if (isDurable(durableClientIdPrefix)) {
       durableClientId = durableClientIdPrefix + taskId;
     } else {
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java 
b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index 931de80..d8f1ab5 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -42,9 +42,6 @@ public class GeodeAsSinkDUnitTest {
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
-  private static MemberVM locator, server;
-  private static ClientVM client;
-
   @Rule
   public TestName testName = new TestName();
 
@@ -91,12 +88,11 @@ public class GeodeAsSinkDUnitTest {
   @Test
   public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() 
throws Exception {
 
-    locator = clusterStartupRule.startLocatorVM(0, 10334);
+    MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
     int locatorPort = locator.getPort();
-    server = clusterStartupRule.startServerVM(1, locatorPort);
-    client =
-        clusterStartupRule
-            .startClientVM(2, client -> 
client.withLocatorConnection(locatorPort));
+    MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
+    ClientVM client1 = clusterStartupRule
+        .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
     int NUM_EVENT = 10;
 
     // Set unique names for all the different components
@@ -132,7 +128,7 @@ public class GeodeAsSinkDUnitTest {
       workerAndHerderCluster = startWorkerAndHerderCluster(numTask, 
sourceRegion, sinkRegion,
           sourceTopic, sinkTopic, 
temporaryFolderForOffset.getRoot().getAbsolutePath(),
           "localhost[" + locatorPort + "]");
-      client.invoke(() -> {
+      client1.invoke(() -> {
         
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
             .create(sinkRegion);
       });
@@ -143,7 +139,7 @@ public class GeodeAsSinkDUnitTest {
         producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
       }
 
-      client.invoke(() -> {
+      client1.invoke(() -> {
         Region region = 
ClusterStartupRule.getClientCache().getRegion(sinkRegion);
         await().atMost(10, TimeUnit.SECONDS)
             .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java 
b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 8b41d3e..7a0f05f 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -54,9 +54,6 @@ public class GeodeAsSourceDUnitTest {
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
-  private static MemberVM locator, server;
-  private static ClientVM client;
-
 
   @Rule
   public TestName testName = new TestName();
@@ -103,12 +100,11 @@ public class GeodeAsSourceDUnitTest {
 
   @Test
   public void 
whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws 
Exception {
-    locator = clusterStartupRule.startLocatorVM(0, 10334);
+    MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
     int locatorPort = locator.getPort();
-    server = clusterStartupRule.startServerVM(1, locatorPort);
-    client =
-        clusterStartupRule
-            .startClientVM(2, client -> 
client.withLocatorConnection(locatorPort));
+    MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
+    ClientVM client1 = clusterStartupRule
+        .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
     int NUM_EVENT = 10;
 
     // Set unique names for all the different components
@@ -128,7 +124,7 @@ public class GeodeAsSourceDUnitTest {
       
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
           .create(sinkRegion);
     });
-    client.invoke(() -> {
+    client1.invoke(() -> {
       
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
           .create(sourceRegion);
     });
@@ -152,7 +148,7 @@ public class GeodeAsSourceDUnitTest {
       Consumer<String, String> consumer = createConsumer(sourceTopic);
 
       // Insert data into the Apache Geode source from the client
-      client.invoke(() -> {
+      client1.invoke(() -> {
         Region region = 
ClusterStartupRule.getClientCache().getRegion(sourceRegion);
         for (int i = 0; i < NUM_EVENT; i++) {
           region.put("KEY" + i, "VALUE" + i);

Reply via email to