This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 792e301 geode-dunit dependency added and naming changes
792e301 is described below
commit 792e30154439c68f82274bc54763c5f13ed9078d
Author: Nabarun Nag <[email protected]>
AuthorDate: Tue Feb 18 20:39:10 2020 -0800
geode-dunit dependency added and naming changes
* Adding geode-dunit to run the tests.
* Adding the dunit folder to the gitignore
* adding the sink tests
* Fixing the sink test issue
* Renamed the package to org.apache
* Spotless and more apache renaming.
* Removed checked in
---
.gitignore | 3 +-
build.gradle | 5 +-
.../geode/kafka/GeodeConnectorConfig.java | 2 +-
.../org/{ => apache}/geode/kafka/GeodeContext.java | 8 +-
.../{ => apache}/geode/kafka/LocatorHostPort.java | 2 +-
.../kafka/security/SystemPropertyAuthInit.java | 2 +-
.../geode/kafka/sink/BatchRecords.java | 2 +-
.../geode/kafka/sink/GeodeKafkaSink.java | 9 +-
.../geode/kafka/sink/GeodeKafkaSinkTask.java | 4 +-
.../geode/kafka/sink/GeodeSinkConnectorConfig.java | 5 +-
.../geode/kafka/source/EventBufferSupplier.java | 2 +-
.../geode/kafka/source/GeodeEvent.java | 2 +-
.../geode/kafka/source/GeodeKafkaSource.java | 7 +-
.../kafka/source/GeodeKafkaSourceListener.java | 2 +-
.../geode/kafka/source/GeodeKafkaSourceTask.java | 4 +-
.../kafka/source/GeodeSourceConnectorConfig.java | 5 +-
.../kafka/source/SharedEventBufferSupplier.java | 2 +-
.../apache/geode/kafka/GeodeAsSinkDUnitTest.java | 163 ++++++++
.../apache/geode/kafka/GeodeAsSourceDUnitTest.java | 173 +++++++++
.../geode/kafka/GeodeConnectorConfigTest.java | 10 +-
.../kafka/security/SystemPropertyAuthInitTest.java | 28 ++
.../geode/kafka/sink/BatchRecordsTest.java | 2 +-
.../geode/kafka/sink/GeodeKafkaSinkTaskTest.java | 8 +-
.../geode/kafka/sink/GeodeKafkaSinkTest.java | 7 +-
.../kafka/source/GeodeKafkaSourceTaskTest.java | 8 +-
.../geode/kafka/source/GeodeKafkaSourceTest.java | 7 +-
.../source/GeodeSourceConnectorConfigTest.java | 7 +-
.../source/SharedEventBufferSupplierTest.java | 2 +-
.../geode/kafka/utilities/GeodeKafkaTestUtils.java | 161 ++++++++
.../geode/kafka/utilities}/JavaProcess.java | 14 +-
.../geode/kafka/utilities}/KafkaLocalCluster.java | 2 +-
.../kafka/utilities}/WorkerAndHerderCluster.java | 9 +-
.../kafka/utilities}/WorkerAndHerderWrapper.java | 35 +-
.../kafka/utilities}/ZooKeeperLocalCluster.java | 3 +-
.../java/org/geode/kafka/GeodeContextTest.java | 18 -
.../org/geode/kafka/GeodeKafkaTestCluster.java | 413 ---------------------
.../java/org/geode/kafka/GeodeLocalCluster.java | 41 --
.../org/geode/kafka/LocatorLauncherWrapper.java | 51 ---
.../org/geode/kafka/ServerLauncherWrapper.java | 70 ----
.../kafka/security/SystemPropertyAuthInitTest.java | 27 --
40 files changed, 633 insertions(+), 692 deletions(-)
diff --git a/.gitignore b/.gitignore
index 74613b4..2e856f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -234,4 +234,5 @@ dist/
nbdist/
.nb-gradle/
-.idea/
\ No newline at end of file
+.idea/
+**/dunit
diff --git a/build.gradle b/build.gradle
index 2b7f853..add42c7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,10 +73,11 @@ dependencies {
testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version:
'2.3.1')
testCompile(group: 'junit', name: 'junit', version: '4.12')
+// testCompile("org.junit.jupiter:junit-jupiter-params:5.4.2")
testCompile('org.mockito:mockito-core:3.2.4')
testCompile('pl.pragmatists:JUnitParams:1.1.1')
-
- testImplementation 'org.awaitility:awaitility:4.0.2'
+ testCompile(group: 'org.apache.geode', name: 'geode-dunit', version:
'1.9.0')
+ testImplementation 'org.awaitility:awaitility:3.1.6'
}
shadowJar {
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
similarity index 99%
rename from src/main/java/org/geode/kafka/GeodeConnectorConfig.java
rename to src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 7fbfb55..476c07c 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka;
import java.util.Arrays;
import java.util.Collection;
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java
b/src/main/java/org/apache/geode/kafka/GeodeContext.java
similarity index 93%
rename from src/main/java/org/geode/kafka/GeodeContext.java
rename to src/main/java/org/apache/geode/kafka/GeodeContext.java
index 9f30242..8833fd9 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -12,11 +12,11 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static
org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
import java.util.List;
diff --git a/src/main/java/org/geode/kafka/LocatorHostPort.java
b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
similarity index 97%
rename from src/main/java/org/geode/kafka/LocatorHostPort.java
rename to src/main/java/org/apache/geode/kafka/LocatorHostPort.java
index 5c71fa1..d879d8e 100644
--- a/src/main/java/org/geode/kafka/LocatorHostPort.java
+++ b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka;
public class LocatorHostPort {
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
similarity index 97%
rename from src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
rename to
src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
index 6b646ee..4f3e414 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.security;
+package org.apache.geode.kafka.security;
import java.util.Properties;
diff --git a/src/main/java/org/geode/kafka/sink/BatchRecords.java
b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
similarity index 98%
rename from src/main/java/org/geode/kafka/sink/BatchRecords.java
rename to src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 049abac..45a93d6 100644
--- a/src/main/java/org/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
similarity index 92%
rename from src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
rename to src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
index 9ee5189..2bdd99a 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
@@ -12,9 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
-
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+package org.apache.geode.kafka.sink;
import java.util.ArrayList;
import java.util.HashMap;
@@ -24,7 +22,8 @@ import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
-import org.geode.kafka.GeodeConnectorConfig;
+
+import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeKafkaSink extends SinkConnector {
private Map<String, String> sharedProps;
@@ -62,7 +61,7 @@ public class GeodeKafkaSink extends SinkConnector {
@Override
public ConfigDef config() {
- return SINK_CONFIG_DEF;
+ return GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
}
@Override
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
similarity index 98%
rename from src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
rename to src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 7c77c7f..eaf0f66 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
import java.util.Collection;
import java.util.HashMap;
@@ -22,13 +22,13 @@ import java.util.stream.Collectors;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
-import org.geode.kafka.GeodeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.GeodeContext;
/**
diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
similarity index 96%
rename from src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
rename to
src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index a074220..b12eb8f 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -12,13 +12,14 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
-import org.geode.kafka.GeodeConnectorConfig;
+
+import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SINK_CONFIG_DEF = configurables();
diff --git a/src/main/java/org/geode/kafka/source/EventBufferSupplier.java
b/src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java
similarity index 96%
rename from src/main/java/org/geode/kafka/source/EventBufferSupplier.java
rename to src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java
index be40602..843c305 100644
--- a/src/main/java/org/geode/kafka/source/EventBufferSupplier.java
+++ b/src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
diff --git a/src/main/java/org/geode/kafka/source/GeodeEvent.java
b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
similarity index 97%
rename from src/main/java/org/geode/kafka/source/GeodeEvent.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
index 5b51d07..654b05a 100644
--- a/src/main/java/org/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import org.apache.geode.cache.query.CqEvent;
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
similarity index 93%
rename from src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
index 7b4445e..d2418fb 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
@@ -12,9 +12,9 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
-import static
org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import static
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,7 +26,8 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
-import org.geode.kafka.GeodeConnectorConfig;
+
+import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeKafkaSource extends SourceConnector {
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java
rename to
src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
index e875ee4..1d16404 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import java.util.concurrent.TimeUnit;
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 4acc081..4e5b415 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import java.util.ArrayList;
import java.util.Collection;
@@ -23,7 +23,6 @@ import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
-import org.geode.kafka.GeodeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +30,7 @@ import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.Struct;
+import org.apache.geode.kafka.GeodeContext;
public class GeodeKafkaSourceTask extends SourceTask {
diff --git
a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
rename to
src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index e96796b..ac70051 100644
--- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
+++
b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -12,14 +12,15 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
-import org.geode.kafka.GeodeConnectorConfig;
+
+import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
diff --git
a/src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java
b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java
rename to
src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
index 963a132..b3d1268 100644
--- a/src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java
+++ b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
new file mode 100644
index 0000000..931de80
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -0,0 +1,163 @@
+package org.apache.geode.kafka;
+
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createProducer;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.utilities.KafkaLocalCluster;
+import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+@RunWith(Parameterized.class)
+public class GeodeAsSinkDUnitTest {
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+ private static MemberVM locator, server;
+ private static ClientVM client;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolderForZooKeeper = new
TemporaryFolder();
+
+ @Rule
+ public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup()
+ throws Exception {
+ startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+ false,
+ 200000,
+ 15000,
+ 10,
+ Time.SYSTEM,
+ "myGroup",
+ "myMetricType",
+ null);
+
+ zkClient.close();
+ }
+
+ @Parameterized.Parameters(name = "tasks_{0}_partitions_{1}")
+ public static Iterable<Object[]> getParams() {
+ return Arrays.asList(new Object[][] {{1, 1}, {5, 10}, {15, 10}});
+ }
+
+ private final int numTask;
+ private final int numPartition;
+
+ public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
+ this.numTask = numTask;
+ this.numPartition = numPartition;
+ }
+
+ @Test
+ public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents()
throws Exception {
+
+ locator = clusterStartupRule.startLocatorVM(0, 10334);
+ int locatorPort = locator.getPort();
+ server = clusterStartupRule.startServerVM(1, locatorPort);
+ client =
+ clusterStartupRule
+ .startClientVM(2, client ->
client.withLocatorConnection(locatorPort));
+ int NUM_EVENT = 10;
+
+ // Set unique names for all the different components
+ String testIdentifier = testName.getMethodName().replaceAll("\\[|\\]", "");
+ String sourceRegion = "SOURCE_REGION_" + testIdentifier;
+ String sinkRegion = "SINK_REGION_" + testIdentifier;
+ String sinkTopic = "SINK_TOPIC_" + testIdentifier;
+ String sourceTopic = "SOURCE_TOPIC_" + testIdentifier;
+
+ /**
+ * Start the Apache Geode cluster and create the source and sink regions.
+ * Create a Apache Geode client which inserts data into the source
+ */
+ server.invoke(() -> {
+
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .create(sourceRegion);
+
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .create(sinkRegion);
+ });
+
+
+ /**
+ * Start the Kafka Cluster, workers and the topic to which the Apache
Geode will connect as
+ * a source
+ */
+ WorkerAndHerderCluster workerAndHerderCluster = null;
+ KafkaLocalCluster kafkaLocalCluster = null;
+ try {
+ kafkaLocalCluster = startKafka(
+
getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
+ createTopic(sinkTopic, numPartition, 1);
+ // Create workers and herder cluster
+ workerAndHerderCluster = startWorkerAndHerderCluster(numTask,
sourceRegion, sinkRegion,
+ sourceTopic, sinkTopic,
temporaryFolderForOffset.getRoot().getAbsolutePath(),
+ "localhost[" + locatorPort + "]");
+ client.invoke(() -> {
+
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(sinkRegion);
+ });
+ // Create the producer
+ Producer<String, String> producer = createProducer();
+
+ for (int i = 0; i < NUM_EVENT; i++) {
+ producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
+ }
+
+ client.invoke(() -> {
+ Region region =
ClusterStartupRule.getClientCache().getRegion(sinkRegion);
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+ });
+
+
+ } finally {
+ // Clean up by deleting the sink topic
+ deleteTopic(sinkTopic);
+ if (workerAndHerderCluster != null) {
+ workerAndHerderCluster.stop();
+ }
+ kafkaLocalCluster.stop();
+ }
+
+ }
+}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
new file mode 100644
index 0000000..8b41d3e
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka;
+
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createConsumer;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
+import static
org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.verifyEventsAreConsumed;
+
+import java.util.Arrays;
+
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.utils.Time;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.utilities.KafkaLocalCluster;
+import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+@RunWith(Parameterized.class)
+public class GeodeAsSourceDUnitTest {
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+ private static MemberVM locator, server;
+ private static ClientVM client;
+
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolderForZooKeeper = new
TemporaryFolder();
+
+ @Rule
+ public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup()
+ throws Exception {
+ startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+ false,
+ 200000,
+ 15000,
+ 10,
+ Time.SYSTEM,
+ "myGroup",
+ "myMetricType",
+ null);
+
+ zkClient.close();
+ }
+
+ @Parameters(name = "tasks_{0}_partitions_{1}")
+ public static Iterable<Object[]> getParams() {
+ return Arrays.asList(new Object[][] {{1, 1}, {1, 2}, {5, 2}});
+ }
+
+ private final int numTask;
+ private final int numPartition;
+
+ public GeodeAsSourceDUnitTest(int numTask, int numPartition) {
+ this.numTask = numTask;
+ this.numPartition = numPartition;
+ }
+
+ @Test
+ public void
whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws
Exception {
+ locator = clusterStartupRule.startLocatorVM(0, 10334);
+ int locatorPort = locator.getPort();
+ server = clusterStartupRule.startServerVM(1, locatorPort);
+ client =
+ clusterStartupRule
+ .startClientVM(2, client ->
client.withLocatorConnection(locatorPort));
+ int NUM_EVENT = 10;
+
+ // Set unique names for all the different components
+ String testIdentifier = testName.getMethodName().replaceAll("\\[|\\]", "");
+ String sourceRegion = "SOURCE_REGION_" + testIdentifier;
+ String sinkRegion = "SINK_REGION_" + testIdentifier;
+ String sinkTopic = "SINK_TOPIC_" + testIdentifier;
+ String sourceTopic = "SOURCE_TOPIC_" + testIdentifier;
+
+ /**
+ * Start the Apache Geode cluster and create the source and sink regions.
+ * Create a Apache Geode client which inserts data into the source
+ */
+ server.invoke(() -> {
+
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .create(sourceRegion);
+
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .create(sinkRegion);
+ });
+ client.invoke(() -> {
+
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(sourceRegion);
+ });
+
+ /**
+ * Start the Kafka Cluster, workers and the topic to which the Apache
Geode will connect as
+ * a source
+ */
+ WorkerAndHerderCluster workerAndHerderCluster = null;
+ KafkaLocalCluster kafkaLocalCluster = null;
+ try {
+ kafkaLocalCluster = startKafka(
+
getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
+ createTopic(sourceTopic, numPartition, 1);
+ // Create workers and herder cluster
+ workerAndHerderCluster = startWorkerAndHerderCluster(numTask,
sourceRegion, sinkRegion,
+ sourceTopic, sinkTopic,
temporaryFolderForOffset.getRoot().getAbsolutePath(),
+ "localhost[" + locatorPort + "]");
+
+ // Create the consumer to consume from the source topic
+ Consumer<String, String> consumer = createConsumer(sourceTopic);
+
+ // Insert data into the Apache Geode source from the client
+ client.invoke(() -> {
+ Region region =
ClusterStartupRule.getClientCache().getRegion(sourceRegion);
+ for (int i = 0; i < NUM_EVENT; i++) {
+ region.put("KEY" + i, "VALUE" + i);
+ }
+ });
+
+ // Assert that all the data inserted in Apache Geode source is received
by the consumer
+ verifyEventsAreConsumed(consumer, NUM_EVENT);
+ } finally {
+ // Clean up by deleting the source topic
+ deleteTopic(sourceTopic);
+ if (workerAndHerderCluster != null) {
+ workerAndHerderCluster.stop();
+ }
+ kafkaLocalCluster.stop();
+ }
+ }
+}
diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
similarity index 96%
rename from src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
rename to src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 5c63d98..db7d921 100644
--- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -12,11 +12,9 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
@@ -137,7 +135,7 @@ public class GeodeConnectorConfigTest {
@Test
public void usesSecurityShouldBeTrueIfSecurityUserSet() {
Map<String, String> props = new HashMap<>();
- props.put(SECURITY_USER, "some user");
+ props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
GeodeConnectorConfig config =
new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertTrue(config.usesSecurity());
@@ -146,7 +144,7 @@ public class GeodeConnectorConfigTest {
@Test
public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
Map<String, String> props = new HashMap<>();
- props.put(SECURITY_CLIENT_AUTH_INIT, "someclass");
+ props.put(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, "someclass");
GeodeConnectorConfig config =
new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertTrue(config.usesSecurity());
@@ -163,7 +161,7 @@ public class GeodeConnectorConfigTest {
@Test
public void securityClientAuthInitShouldBeSetIfUserIsSet() {
Map<String, String> props = new HashMap<>();
- props.put(SECURITY_USER, "some user");
+ props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
GeodeConnectorConfig config =
new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertNotNull(config.getSecurityClientAuthInit());
diff --git
a/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
b/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
new file mode 100644
index 0000000..5b2d43c
--- /dev/null
+++
b/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
@@ -0,0 +1,28 @@
+package org.apache.geode.kafka.security;
+
+import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.junit.Test;
+
+import org.apache.geode.security.AuthInitialize;
+
+public class SystemPropertyAuthInitTest {
+
+ @Test
+ public void userNameAndPasswordAreObtainedFromSecurityProps() {
+ SystemPropertyAuthInit auth = new SystemPropertyAuthInit();
+ String userName = "someUsername";
+ String password = "somePassword";
+
+ Properties securityProps = new Properties();
+ securityProps.put(SECURITY_USER, userName);
+ securityProps.put(SECURITY_PASSWORD, password);
+ Properties credentials = auth.getCredentials(securityProps, null, true);
+ assertEquals(credentials.get((AuthInitialize.SECURITY_USERNAME)),
userName);
+ assertEquals(credentials.get((AuthInitialize.SECURITY_PASSWORD)),
password);
+ }
+}
diff --git a/src/test/java/org/geode/kafka/sink/BatchRecordsTest.java
b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
similarity index 99%
rename from src/test/java/org/geode/kafka/sink/BatchRecordsTest.java
rename to src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
index c2da554..f59ab7b 100644
--- a/src/test/java/org/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
similarity index 93%
rename from src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
rename to src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index 32925fb..b38a597 100644
--- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -12,10 +12,10 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
-import static
org.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
-import static
org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static
org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
+import static
org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -28,10 +28,10 @@ import java.util.HashMap;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
-import org.geode.kafka.GeodeConnectorConfig;
import org.junit.Test;
import org.apache.geode.cache.Region;
+import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeKafkaSinkTaskTest {
diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
similarity index 93%
rename from src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
rename to src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
index 28e7033..0ae65da 100644
--- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -12,9 +12,9 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
-import static
org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static
org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -23,9 +23,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import org.geode.kafka.GeodeConnectorConfig;
import org.junit.Test;
+import org.apache.geode.kafka.GeodeConnectorConfig;
+
public class GeodeKafkaSinkTest {
@Test
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
similarity index 97%
rename from src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
rename to
src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 4fa7d81..bf427fd 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -12,10 +12,10 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
-import static
org.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
-import static
org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
+import static
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
+import static
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -35,7 +35,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import org.geode.kafka.GeodeContext;
import org.junit.Test;
import org.apache.geode.cache.client.ClientCache;
@@ -44,6 +43,7 @@ import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.kafka.GeodeContext;
public class GeodeKafkaSourceTaskTest {
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java
b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
similarity index 92%
rename from src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java
rename to src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
index 433550a..38e9498 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -12,9 +12,9 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
-import static
org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -23,9 +23,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import org.geode.kafka.GeodeConnectorConfig;
import org.junit.Test;
+import org.apache.geode.kafka.GeodeConnectorConfig;
+
public class GeodeKafkaSourceTest {
@Test
diff --git
a/src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java
b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
similarity index 87%
rename from
src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java
rename to
src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
index fdcd7d3..d543d23 100644
--- a/src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java
+++
b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -12,17 +12,18 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
-import static
org.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
-import org.geode.kafka.GeodeConnectorConfig;
import org.junit.Test;
+import org.apache.geode.kafka.GeodeConnectorConfig;
+
public class GeodeSourceConnectorConfigTest {
@Test
diff --git
a/src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java
b/src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java
similarity index 98%
rename from
src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java
rename to
src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java
index 92de30d..2683a61 100644
--- a/src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java
+++
b/src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
diff --git
a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
new file mode 100644
index 0000000..f9c1e2b
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.utilities;
+
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.rules.TemporaryFolder;
+
+public class GeodeKafkaTestUtils {
+ public static ZooKeeperLocalCluster startZooKeeper(Properties
zookeeperProperties)
+ throws IOException, QuorumPeerConfig.ConfigException {
+ ZooKeeperLocalCluster zooKeeperLocalCluster = new
ZooKeeperLocalCluster(zookeeperProperties);
+ zooKeeperLocalCluster.start();
+ return zooKeeperLocalCluster;
+ }
+
+ public static KafkaLocalCluster startKafka(Properties kafkaProperties)
+ throws IOException, InterruptedException {
+ KafkaLocalCluster kafkaLocalCluster = new
KafkaLocalCluster(kafkaProperties);
+ kafkaLocalCluster.start();
+ return kafkaLocalCluster;
+ }
+
+ public static void createTopic(String topicName, int numPartitions, int
replicationFactor) {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false,
200000,
+ 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
+
+ Properties topicProperties = new Properties();
+ topicProperties.put("flush.messages", "1");
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+ adminZkClient.createTopic(topicName, numPartitions, replicationFactor,
topicProperties,
+ RackAwareMode.Disabled$.MODULE$);
+ }
+
+ public static void deleteTopic(String topicName) {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false,
200000,
+ 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+ adminZkClient.deleteTopic(topicName);
+ }
+
+ public static Producer<String, String> createProducer() {
+ final Properties props = new Properties();
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+
+ // Create the producer using props.
+ final Producer<String, String> producer =
+ new KafkaProducer<>(props);
+ return producer;
+ }
+
+ public static Properties getZooKeeperProperties(TemporaryFolder
temporaryFolder)
+ throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty("dataDir",
temporaryFolder.newFolder("zookeeper").getAbsolutePath());
+ properties.setProperty("clientPort", "2181");
+ properties.setProperty("tickTime", "2000");
+ return properties;
+ }
+
+ public static Properties getKafkaConfig(String logPath) {
+ int BROKER_PORT = 9092;
+ Properties props = new Properties();
+ props.put("broker.id", "0");
+ props.put("zookeeper.connect", "localhost:2181");
+ props.put("host.name", "localHost");
+ props.put("port", BROKER_PORT);
+ props.put("offsets.topic.replication.factor", "1");
+ props.put("log.dirs", logPath);
+ return props;
+ }
+
+ // consumer props, less important, just for testing?
+ public static Consumer<String, String> createConsumer(String
testTopicForSource) {
+ final Properties props = new Properties();
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
+ "myGroup");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Create the consumer using props.
+ final Consumer<String, String> consumer =
+ new KafkaConsumer<>(props);
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(testTopicForSource));
+ return consumer;
+ }
+
+ public static WorkerAndHerderCluster startWorkerAndHerderCluster(int
maxTasks,
+ String sourceRegion,
+ String sinkRegion,
+ String sourceTopic,
+ String sinkTopic,
+ String offsetPath,
+ String locatorString) {
+ WorkerAndHerderCluster workerAndHerderCluster = new
WorkerAndHerderCluster();
+ try {
+ workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion,
sinkRegion, sourceTopic,
+ sinkTopic, offsetPath, locatorString);
+ Thread.sleep(20000);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not start the worker and herder
cluster" + e);
+ }
+ return workerAndHerderCluster;
+ }
+
+ public static void verifyEventsAreConsumed(Consumer<String, String>
consumer, int numEvents) {
+ AtomicInteger valueReceived = new AtomicInteger(0);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
+ for (ConsumerRecord<String, String> record : records) {
+ valueReceived.incrementAndGet();
+ }
+ return valueReceived.get() == numEvents;
+ });
+
+ }
+}
diff --git a/src/test/java/org/geode/kafka/JavaProcess.java
b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
similarity index 79%
rename from src/test/java/org/geode/kafka/JavaProcess.java
rename to src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
index 5355c75..c289c80 100644
--- a/src/test/java/org/geode/kafka/JavaProcess.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
import java.io.File;
import java.io.IOException;
@@ -31,9 +31,17 @@ public class JavaProcess {
System.getProperty("java.home") + File.separator + "bin" +
File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = classWithMain.getName();
-
+ int commandLength = args.length + 4;
+ String[] processBuilderCommand = new String[commandLength];
+ processBuilderCommand[0] = java;
+ processBuilderCommand[1] = "-cp";
+ processBuilderCommand[2] = classpath;
+ processBuilderCommand[3] = className;
+ for (int i = 0; i < args.length; i++) {
+ processBuilderCommand[4 + i] = args[i];
+ }
ProcessBuilder builder = new ProcessBuilder(
- java, "-cp", classpath, className, convertArgsToString(args));
+ processBuilderCommand);
process = builder.inheritIO().start();
}
diff --git a/src/test/java/org/geode/kafka/KafkaLocalCluster.java
b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
similarity index 97%
rename from src/test/java/org/geode/kafka/KafkaLocalCluster.java
rename to src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
index ee13f8c..338e819 100644
--- a/src/test/java/org/geode/kafka/KafkaLocalCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
import java.io.IOException;
import java.util.Properties;
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
similarity index 73%
rename from src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
rename to
src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
index 824edab..7c58bc0 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
import java.io.IOException;
@@ -26,7 +26,14 @@ public class WorkerAndHerderCluster {
public void start(String maxTasks) throws IOException, InterruptedException {
workerAndHerder.exec(maxTasks);
+ }
+ public void start(String maxTasks, String sourceRegion, String sinkRegion,
String sourceTopic,
+ String sinkTopic, String offsetPath, String locatorString)
+ throws IOException, InterruptedException {
+ String[] args = new String[] {maxTasks, sourceRegion, sinkRegion,
sourceTopic, sinkTopic,
+ offsetPath, locatorString};
+ workerAndHerder.exec(args);
}
public void stop() {
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
similarity index 74%
rename from src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
rename to
src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index 3afcde7..3fe6e66 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -12,10 +12,9 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
-import static
org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static
org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
import java.io.IOException;
import java.util.HashMap;
@@ -32,17 +31,31 @@ import
org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
-import org.geode.kafka.sink.GeodeKafkaSink;
-import org.geode.kafka.source.GeodeKafkaSource;
+
+import org.apache.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.sink.GeodeKafkaSink;
+import org.apache.geode.kafka.sink.GeodeSinkConnectorConfig;
+import org.apache.geode.kafka.source.GeodeKafkaSource;
public class WorkerAndHerderWrapper {
public static void main(String[] args) throws IOException {
+ if (args.length != 7) {
+ throw new RuntimeException("Insufficient arguments to start workers and
herders");
+ }
String maxTasks = args[0];
+ String sourceRegion = args[1];
+ String sinkRegion = args[2];
+ String sourceTopic = args[3];
+ String sinkTopic = args[4];
+ String offsetPath = args[5];
+ String regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
+ String topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
+ String locatorString = args[6];
Map props = new HashMap();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ props.put("offset.storage.file.filename", offsetPath);
// fast flushing for testing.
props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
@@ -54,7 +67,7 @@ public class WorkerAndHerderWrapper {
"org.apache.kafka.connect.storage.StringConverter");
props.put("key.converter.schemas.enable", "false");
props.put("value.converter.schemas.enable", "false");
-
+ props.put(GeodeConnectorConfig.LOCATORS, locatorString);
WorkerConfig workerCfg = new StandaloneConfig(props);
MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
@@ -72,7 +85,8 @@ public class WorkerAndHerderWrapper {
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG,
"geode-kafka-source-connector");
sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
- sourceProps.put(REGION_TO_TOPIC_BINDINGS,
GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS);
+ sourceProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+ sourceProps.put(REGION_TO_TOPIC_BINDINGS, regionToTopicBinding);
herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -83,8 +97,9 @@ public class WorkerAndHerderWrapper {
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
- sinkProps.put(TOPIC_TO_REGION_BINDINGS,
GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS);
- sinkProps.put("topics", GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK);
+ sinkProps.put(GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS,
topicToRegionBinding);
+ sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+ sinkProps.put("topics", sinkTopic);
herder.putConnectorConfig(
sinkProps.get(ConnectorConfig.NAME_CONFIG),
diff --git a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
similarity index 98%
rename from src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
rename to
src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
index 65ff819..8e5e7a9 100644
--- a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
import java.io.IOException;
import java.util.Properties;
@@ -52,5 +52,6 @@ public class ZooKeeperLocalCluster {
};
zooKeeperThread.start();
System.out.println("ZooKeeper thread started");
+
}
}
diff --git a/src/test/java/org/geode/kafka/GeodeContextTest.java
b/src/test/java/org/geode/kafka/GeodeContextTest.java
deleted file mode 100644
index eb10bee..0000000
--- a/src/test/java/org/geode/kafka/GeodeContextTest.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-public class GeodeContextTest {
-}
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
deleted file mode 100644
index f620caa..0000000
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.admin.RackAwareMode;
-import kafka.zk.AdminZkClient;
-import kafka.zk.KafkaZkClient;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-
-public class GeodeKafkaTestCluster {
-
- @ClassRule
- public static TemporaryFolder temporaryFolder = new TemporaryFolder();
- private static boolean debug = true;
-
- public static String TEST_REGION_TO_TOPIC_BINDINGS =
"[someRegionForSource:someTopicForSource]";
- public static String TEST_TOPIC_TO_REGION_BINDINGS =
"[someTopicForSink:someRegionForSink]";
-
- public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource";
- public static String TEST_REGION_FOR_SOURCE = "someRegionForSource";
- public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
- public static String TEST_REGION_FOR_SINK = "someRegionForSink";
-
- private static ZooKeeperLocalCluster zooKeeperLocalCluster;
- private static KafkaLocalCluster kafkaLocalCluster;
- private static GeodeLocalCluster geodeLocalCluster;
- private static WorkerAndHerderCluster workerAndHerderCluster;
- private static Consumer<String, String> consumer;
-
- @BeforeClass
- public static void setup()
- throws IOException, QuorumPeerConfig.ConfigException,
InterruptedException {
- startZooKeeper();
- startKafka();
- startGeode();
- }
-
-
- @AfterClass
- public static void shutdown() {
- workerAndHerderCluster.stop();
- KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false,
200000,
- 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
- // AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
- // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
- zkClient.close();
- kafkaLocalCluster.stop();
- geodeLocalCluster.stop();
- }
-
-
- private static void startWorker(int maxTasks) throws IOException,
InterruptedException {
- workerAndHerderCluster = new WorkerAndHerderCluster();
- workerAndHerderCluster.start(String.valueOf(maxTasks));
- Thread.sleep(20000);
- }
-
- private static void createTopic(String topicName, int numPartitions, int
replicationFactor) {
- KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false,
200000,
- 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
-
- Properties topicProperties = new Properties();
- topicProperties.put("flush.messages", "1");
- AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic(topicName, numPartitions, replicationFactor,
topicProperties,
- RackAwareMode.Disabled$.MODULE$);
- }
-
- private static void deleteTopic(String topicName) {
- KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false,
200000,
- 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
- AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.deleteTopic(topicName);
- }
-
- private ClientCache createGeodeClient() {
- return new ClientCacheFactory().addPoolLocator("localhost",
10334).create();
- }
-
- private static void startZooKeeper() throws IOException,
QuorumPeerConfig.ConfigException {
- zooKeeperLocalCluster = new
ZooKeeperLocalCluster(getZooKeeperProperties());
- zooKeeperLocalCluster.start();
- }
-
- private static void startKafka()
- throws IOException, InterruptedException,
QuorumPeerConfig.ConfigException {
- kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig());
- kafkaLocalCluster.start();
- }
-
- private static void startGeode() throws IOException, InterruptedException {
- geodeLocalCluster = new GeodeLocalCluster();
- geodeLocalCluster.start();
- }
-
- private static Properties getZooKeeperProperties() throws IOException {
- Properties properties = new Properties();
- properties.setProperty("dataDir",
- (debug) ? "/tmp/zookeeper" :
temporaryFolder.newFolder("zookeeper").getAbsolutePath());
- properties.setProperty("clientPort", "2181");
- properties.setProperty("tickTime", "2000");
- return properties;
- }
-
-
- private static Properties getKafkaConfig() throws IOException {
- int BROKER_PORT = 9092;
- Properties props = new Properties();
-
- props.put("broker.id", "0");
- props.put("zookeeper.connect", "localhost:2181");
- props.put("host.name", "localHost");
- props.put("port", BROKER_PORT);
- props.put("offsets.topic.replication.factor", "1");
-
- // Specifically GeodeKafka connector configs
- return props;
- }
-
-
- // consumer props, less important, just for testing?
- public static Consumer<String, String> createConsumer() {
- final Properties props = new Properties();
- props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "myGroup");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- // Create the consumer using props.
- final Consumer<String, String> consumer =
- new KafkaConsumer<>(props);
- // Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
- return consumer;
- }
-
- // consumer props, less important, just for testing?
- public static Producer<String, String> createProducer() {
- final Properties props = new Properties();
- props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class.getName());
-
- // Create the producer using props.
- final Producer<String, String> producer =
- new KafkaProducer<>(props);
- return producer;
- }
-
- @Test
- public void endToEndSourceTest() throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SOURCE, 1, 1);
- startWorker(1);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY)
- .create(TEST_REGION_FOR_SOURCE);
-
- for (int i = 0; i < 10; i++) {
- region.put("KEY" + i, "VALUE" + i);
- }
-
- AtomicInteger valueReceived = new AtomicInteger(0);
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
- for (ConsumerRecord<String, String> record : records) {
- valueReceived.incrementAndGet();
- }
- return valueReceived.get() == 10;
- });
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SOURCE);
- }
- }
-
-
- @Test
- public void endToEndSourceSingleRegionMultiTaskMultiPartitionTest() throws
Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
- startWorker(1);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY)
- .create(TEST_REGION_FOR_SOURCE);
-
- for (int i = 0; i < 10; i++) {
- region.put("KEY" + i, "VALUE" + i);
- }
-
- AtomicInteger valueReceived = new AtomicInteger(0);
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
- for (ConsumerRecord<String, String> record : records) {
- valueReceived.incrementAndGet();
- }
- return valueReceived.get() == 10;
- });
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SOURCE);
- }
- }
-
- @Test
- public void
endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest()
- throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
- startWorker(5);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY)
- .create(TEST_REGION_FOR_SOURCE);
-
- for (int i = 0; i < 10; i++) {
- region.put("KEY" + i, "VALUE" + i);
- }
-
- AtomicInteger valueReceived = new AtomicInteger(0);
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
- for (ConsumerRecord<String, String> record : records) {
- valueReceived.incrementAndGet();
- }
- return valueReceived.get() == 10;
- });
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SOURCE);
- }
- }
-
- @Test
- public void endToEndSinkTest() throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SINK, 1, 1);
- startWorker(1);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
-
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
- Producer<String, String> producer = createProducer();
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
- }
-
- int i = 0;
- await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SINK);
- }
- }
-
-
- @Test
- public void
endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest()
- throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
- startWorker(5);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
-
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
- Producer<String, String> producer = createProducer();
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
- }
-
- int i = 0;
- await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SINK);
- }
- }
-
- @Test
- public void
endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest()
- throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
- startWorker(15);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
-
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
- Producer<String, String> producer = createProducer();
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
- }
-
- int i = 0;
- await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SINK);
- }
- }
-
- @Test
- public void endToEndWithOneTaskForASingleBindingLessTasksThanPartitions()
throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
- startWorker(5);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
-
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
- Producer<String, String> producer = createProducer();
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
- }
-
- int i = 0;
- await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SINK);
- }
- }
-
- @Test
- public void endToEndWithOneTaskForASingleBindingMoreTasksThanPartitions()
throws Exception {
- try {
- createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
- startWorker(5);
- consumer = createConsumer();
-
- ClientCache client = createGeodeClient();
- Region region =
-
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
- Producer<String, String> producer = createProducer();
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i,
UUID.randomUUID().toString(),
- UUID.randomUUID().toString()));
- }
-
- int i = 0;
- await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- } finally {
- deleteTopic(TEST_TOPIC_FOR_SINK);
- }
- }
-
-}
diff --git a/src/test/java/org/geode/kafka/GeodeLocalCluster.java
b/src/test/java/org/geode/kafka/GeodeLocalCluster.java
deleted file mode 100644
index 6784391..0000000
--- a/src/test/java/org/geode/kafka/GeodeLocalCluster.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import java.io.IOException;
-
-public class GeodeLocalCluster {
-
- private JavaProcess locatorProcess;
- private JavaProcess serverProcess;
-
- public GeodeLocalCluster() {
- locatorProcess = new JavaProcess(LocatorLauncherWrapper.class);
- serverProcess = new JavaProcess(ServerLauncherWrapper.class);
- }
-
- public void start() throws IOException, InterruptedException {
- System.out.println("starting locator");
- locatorProcess.exec("10334");
- Thread.sleep(15000);
- serverProcess.exec("40404");
- Thread.sleep(30000);
- }
-
- public void stop() {
- serverProcess.destroy();
- locatorProcess.destroy();
- }
-}
diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
deleted file mode 100644
index bda962e..0000000
--- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
-
-public class LocatorLauncherWrapper {
-
- public static void main(String[] args) throws IOException {
- Properties properties = new Properties();
- // String statsFile = new File(context.getOutputDir(),
"stats.gfs").getAbsolutePath();
- //
properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE,
statsFile);
- properties.setProperty(ConfigurationProperties.NAME, "locator1");
-
- Locator.startLocatorAndDS(10334,
- new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"),
properties);
- while (true) {
-
- }
- //
- // LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
- // .setMemberName("locator1")
- //// .setPort(Integer.valueOf(args[0]))
- //// .setBindAddress("localhost")
- // .build();
- //
- // locatorLauncher.start();
- // while (!locatorLauncher.isRunning()) {
- //
- // }
- // System.out.println(locatorLauncher.getBindAddress() + ":" +
locatorLauncher.getPort());
-
- }
-}
diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
deleted file mode 100644
index 1bc63fe..0000000
--- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.distributed.ConfigurationProperties;
-
-public class ServerLauncherWrapper {
-
- public static void main(String... args) throws IOException {
- // ServerLauncher serverLauncher = new ServerLauncher.Builder()
- // .setMemberName("server1")
- //// .setServerPort(Integer.valueOf(args[0]))
- //// .setServerBindAddress("localhost")
- // // .set("locators", "localhost[10334]")
- //// .set("jmx-manager", "true")
- //// .set("jmx-manager-start", "true")
- // .build();
- //
- // serverLauncher.start();
- // System.out.println("Geode Server Launcher complete");
-
-
-
- Properties properties = new Properties();
- String locatorString = "localhost[10334]";
- // String statsFile = new File(context.getOutputDir(),
"stats.gfs").getAbsolutePath();
- Cache cache = new CacheFactory(properties)
- // .setPdxSerializer(new
ReflectionBasedAutoSerializer("benchmark.geode.data.*"))
- .set(ConfigurationProperties.LOCATORS, locatorString)
- .set(ConfigurationProperties.NAME,
- "server-1")
- .set(ConfigurationProperties.LOG_FILE,
- "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
- .set(ConfigurationProperties.LOG_LEVEL, "info")
- // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
- .create();
- CacheServer cacheServer = cache.addCacheServer();
- cacheServer.setPort(0);
- cacheServer.start();
-
- // create the region
- cache.createRegionFactory(RegionShortcut.PARTITION).create(
- GeodeKafkaTestCluster.TEST_REGION_FOR_SINK);
- cache.createRegionFactory(RegionShortcut.PARTITION).create(
- GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE);
- System.out.println("starting cacheserver");
- while (true) {
-
- }
- }
-}
diff --git
a/src/test/java/org/geode/kafka/security/SystemPropertyAuthInitTest.java
b/src/test/java/org/geode/kafka/security/SystemPropertyAuthInitTest.java
deleted file mode 100644
index cdfd48c..0000000
--- a/src/test/java/org/geode/kafka/security/SystemPropertyAuthInitTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.geode.kafka.security;
-
-import org.apache.geode.security.AuthInitialize;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
-import static org.junit.Assert.assertEquals;
-
-public class SystemPropertyAuthInitTest {
-
- @Test
- public void userNameAndPasswordAreObtainedFromSecurityProps() {
- SystemPropertyAuthInit auth = new SystemPropertyAuthInit();
- String userName = "someUsername";
- String password = "somePassword";
-
- Properties securityProps = new Properties();
- securityProps.put(SECURITY_USER, userName);
- securityProps.put(SECURITY_PASSWORD, password);
- Properties credentials = auth.getCredentials(securityProps, null,
true);
- assertEquals(credentials.get((AuthInitialize.SECURITY_USERNAME)),
userName);
- assertEquals(credentials.get((AuthInitialize.SECURITY_PASSWORD)),
password);
- }
-}