This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3515ae7 Converting global variables to local 3515ae7 is described below commit 3515ae72c7f25399120bd07f0fa2ac74bb4895d5 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Wed Feb 19 09:03:10 2020 -0800 Converting global variables to local --- .../org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java | 3 +-- .../apache/geode/kafka/source/GeodeKafkaSourceTask.java | 3 +-- .../geode/kafka/source/GeodeSourceConnectorConfig.java | 3 +-- .../org/apache/geode/kafka/GeodeAsSinkDUnitTest.java | 16 ++++++---------- .../org/apache/geode/kafka/GeodeAsSourceDUnitTest.java | 16 ++++++---------- 5 files changed, 15 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java index eaf0f66..daf2274 100644 --- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -40,7 +40,6 @@ public class GeodeKafkaSinkTask extends SinkTask { private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class); private GeodeContext geodeContext; - private int taskId; private Map<String, List<String>> topicToRegions; private Map<String, Region> regionNameToRegion; private boolean nullValuesMeansRemove = true; @@ -73,7 +72,7 @@ public class GeodeKafkaSinkTask extends SinkTask { void configure(GeodeSinkConnectorConfig geodeConnectorConfig) { logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); - taskId = geodeConnectorConfig.getTaskId(); + int taskId = geodeConnectorConfig.getTaskId(); topicToRegions = geodeConnectorConfig.getTopicToRegions(); nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove(); } diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java index 4e5b415..2d5abe4 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java @@ -43,7 +43,6 @@ public class GeodeKafkaSourceTask extends SourceTask { private static final Map<String, Long> OFFSET_DEFAULT = createOffset(); private GeodeContext geodeContext; - private GeodeSourceConnectorConfig geodeConnectorConfig; private EventBufferSupplier eventBufferSupplier; private Map<String, List<String>> regionToTopics; private Map<String, Map<String, String>> sourcePartitions; @@ -64,7 +63,7 @@ public class GeodeKafkaSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { try { - geodeConnectorConfig = new GeodeSourceConnectorConfig(props); + GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); geodeContext = new GeodeContext(); geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java index ac70051..a004f23 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -54,7 +54,6 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; private final String durableClientId; - private final String durableClientIdPrefix; private final String durableClientTimeout; private final String cqPrefix; private final boolean loadEntireRegion; @@ -68,7 +67,7 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { super(SOURCE_CONFIG_DEF, connectorProperties); cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet(); regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS)); - durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX); + String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX); if (isDurable(durableClientIdPrefix)) { durableClientId = durableClientIdPrefix + taskId; } else { diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java index 931de80..d8f1ab5 100644 --- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java @@ -42,9 +42,6 @@ public class GeodeAsSinkDUnitTest { @Rule public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); - private static MemberVM locator, server; - private static ClientVM client; - @Rule public TestName testName = new TestName(); @@ -91,12 +88,11 @@ public class GeodeAsSinkDUnitTest { @Test public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception { - locator = clusterStartupRule.startLocatorVM(0, 10334); + MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334); int locatorPort = locator.getPort(); - server = clusterStartupRule.startServerVM(1, locatorPort); - client = - clusterStartupRule - .startClientVM(2, client -> client.withLocatorConnection(locatorPort)); + MemberVM server = clusterStartupRule.startServerVM(1, locatorPort); + ClientVM client1 = clusterStartupRule + .startClientVM(2, client -> client.withLocatorConnection(locatorPort)); int NUM_EVENT = 10; // Set unique names for all the different components @@ -132,7 +128,7 @@ public class GeodeAsSinkDUnitTest { workerAndHerderCluster = startWorkerAndHerderCluster(numTask, sourceRegion, sinkRegion, sourceTopic, sinkTopic, temporaryFolderForOffset.getRoot().getAbsolutePath(), "localhost[" + locatorPort + "]"); - client.invoke(() -> { + client1.invoke(() -> { ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY) .create(sinkRegion); }); @@ -143,7 +139,7 @@ public class GeodeAsSinkDUnitTest { producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i)); } - client.invoke(() -> { + client1.invoke(() -> { Region region = ClusterStartupRule.getClientCache().getRegion(sinkRegion); await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java index 8b41d3e..7a0f05f 100644 --- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java @@ -54,9 +54,6 @@ public class GeodeAsSourceDUnitTest { @Rule public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); - private static MemberVM locator, server; - private static ClientVM client; - @Rule public TestName testName = new TestName(); @@ -103,12 +100,11 @@ public class GeodeAsSourceDUnitTest { @Test public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws Exception { - locator = clusterStartupRule.startLocatorVM(0, 10334); + MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334); int locatorPort = locator.getPort(); - server = clusterStartupRule.startServerVM(1, locatorPort); - client = - clusterStartupRule - .startClientVM(2, client -> client.withLocatorConnection(locatorPort)); + MemberVM server = clusterStartupRule.startServerVM(1, locatorPort); + ClientVM client1 = clusterStartupRule + .startClientVM(2, client -> client.withLocatorConnection(locatorPort)); int NUM_EVENT = 10; // Set unique names for all the different components @@ -128,7 +124,7 @@ public class GeodeAsSourceDUnitTest { ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION) .create(sinkRegion); }); - client.invoke(() -> { + client1.invoke(() -> { ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY) .create(sourceRegion); }); @@ -152,7 +148,7 @@ public class GeodeAsSourceDUnitTest { Consumer<String, String> consumer = createConsumer(sourceTopic); // Insert data into the Apache Geode source from the client - client.invoke(() -> { + client1.invoke(() -> { Region region = ClusterStartupRule.getClientCache().getRegion(sourceRegion); for (int i = 0; i < NUM_EVENT; i++) { region.put("KEY" + i, "VALUE" + i);