This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/restructuring in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 7f72e657fcb36f1b104c98b3f381f84ecb60a953 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Tue Feb 18 20:07:28 2020 -0800 Renamed the package to org.apache --- .../geode/kafka/GeodeConnectorConfig.java | 2 +- .../org/{ => apache}/geode/kafka/GeodeContext.java | 2 +- .../{ => apache}/geode/kafka/LocatorHostPort.java | 2 +- .../kafka/security/SystemPropertyAuthInit.java | 2 +- .../geode/kafka/sink/BatchRecords.java | 2 +- .../geode/kafka/sink/GeodeKafkaSink.java | 8 +- .../geode/kafka/sink/GeodeKafkaSinkTask.java | 4 +- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 4 +- .../geode/kafka/source/EventBufferSupplier.java | 2 +- .../geode/kafka/source/GeodeEvent.java | 2 +- .../geode/kafka/source/GeodeKafkaSource.java | 6 +- .../kafka/source/GeodeKafkaSourceListener.java | 2 +- .../geode/kafka/source/GeodeKafkaSourceTask.java | 4 +- .../kafka/source/GeodeSourceConnectorConfig.java | 4 +- .../kafka/source/SharedEventBufferSupplier.java | 2 +- .../geode/kafka/GeodeAsSinkDUnitTest.java | 20 +- .../geode/kafka/GeodeAsSourceDUnitTest.java | 25 +- .../geode/kafka/GeodeConnectorConfigTest.java | 10 +- .../geode/kafka/sink/BatchRecordsTest.java | 2 +- .../geode/kafka/sink/GeodeKafkaSinkTaskTest.java | 4 +- .../geode/kafka/sink/GeodeKafkaSinkTest.java | 6 +- .../kafka/source/GeodeKafkaSourceTaskTest.java | 8 +- .../geode/kafka/source/GeodeKafkaSourceTest.java | 6 +- .../source/GeodeSourceConnectorConfigTest.java | 6 +- .../source/SharedEventBufferSupplierTest.java | 2 +- .../kafka/utilities}/GeodeKafkaTestUtils.java | 28 +- .../geode/kafka/utilities}/JavaProcess.java | 2 +- .../geode/kafka/utilities}/KafkaLocalCluster.java | 2 +- .../kafka/utilities}/WorkerAndHerderCluster.java | 2 +- .../kafka/utilities}/WorkerAndHerderWrapper.java | 41 +- .../kafka/utilities}/ZooKeeperLocalCluster.java | 2 +- .../java/org/geode/kafka/GeodeContextTest.java | 18 - .../org/geode/kafka/GeodeKafkaTestCluster.java | 412 --------------------- .../java/org/geode/kafka/GeodeLocalCluster.java | 41 -- .../org/geode/kafka/LocatorLauncherWrapper.java | 50 --- .../org/geode/kafka/ServerLauncherWrapper.java | 70 ---- 36 files changed, 108 insertions(+), 697 deletions(-) diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java similarity index 99% rename from src/main/java/org/geode/kafka/GeodeConnectorConfig.java rename to src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java index 7fbfb55..476c07c 100644 --- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka; import java.util.Arrays; import java.util.Collection; diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java similarity index 99% rename from src/main/java/org/geode/kafka/GeodeContext.java rename to src/main/java/org/apache/geode/kafka/GeodeContext.java index 9f30242..02cc85f 100644 --- a/src/main/java/org/geode/kafka/GeodeContext.java +++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka; import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT; import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD; diff --git a/src/main/java/org/geode/kafka/LocatorHostPort.java b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java similarity index 97% rename from src/main/java/org/geode/kafka/LocatorHostPort.java rename to src/main/java/org/apache/geode/kafka/LocatorHostPort.java index 5c71fa1..d879d8e 100644 --- a/src/main/java/org/geode/kafka/LocatorHostPort.java +++ b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka; public class LocatorHostPort { diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java similarity index 97% rename from src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java rename to src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java index 6b646ee..4f3e414 100644 --- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java +++ b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.security; +package org.apache.geode.kafka.security; import java.util.Properties; diff --git a/src/main/java/org/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java similarity index 98% rename from src/main/java/org/geode/kafka/sink/BatchRecords.java rename to src/main/java/org/apache/geode/kafka/sink/BatchRecords.java index 049abac..45a93d6 100644 --- a/src/main/java/org/geode/kafka/sink/BatchRecords.java +++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; +package org.apache.geode.kafka.sink; import java.util.ArrayList; import java.util.Collection; diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java similarity index 91% rename from src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java rename to src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java index 9ee5189..e4754f7 100644 --- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java @@ -12,9 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; - -import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF; +package org.apache.geode.kafka.sink; import java.util.ArrayList; import java.util.HashMap; @@ -24,7 +22,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; public class GeodeKafkaSink extends SinkConnector { private Map<String, String> sharedProps; @@ -62,7 +60,7 @@ public class GeodeKafkaSink extends SinkConnector { @Override public ConfigDef config() { - return SINK_CONFIG_DEF; + return GeodeSinkConnectorConfig.SINK_CONFIG_DEF; } @Override diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java similarity index 98% rename from src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java rename to src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java index 7c77c7f..023284e 100644 --- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; +package org.apache.geode.kafka.sink; import java.util.Collection; import java.util.HashMap; @@ -22,7 +22,7 @@ import java.util.stream.Collectors; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; -import org.geode.kafka.GeodeContext; +import org.apache.geode.kafka.GeodeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java similarity index 96% rename from src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java rename to src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java index a074220..47eb165 100644 --- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -12,13 +12,13 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; +package org.apache.geode.kafka.sink; import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { public static final ConfigDef SINK_CONFIG_DEF = configurables(); diff --git a/src/main/java/org/geode/kafka/source/EventBufferSupplier.java b/src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java similarity index 96% rename from src/main/java/org/geode/kafka/source/EventBufferSupplier.java rename to src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java index be40602..843c305 100644 --- a/src/main/java/org/geode/kafka/source/EventBufferSupplier.java +++ b/src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; diff --git a/src/main/java/org/geode/kafka/source/GeodeEvent.java b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java similarity index 97% rename from src/main/java/org/geode/kafka/source/GeodeEvent.java rename to src/main/java/org/apache/geode/kafka/source/GeodeEvent.java index 5b51d07..654b05a 100644 --- a/src/main/java/org/geode/kafka/source/GeodeEvent.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import org.apache.geode.cache.query.CqEvent; diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java similarity index 93% rename from src/main/java/org/geode/kafka/source/GeodeKafkaSource.java rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java index 7b4445e..e8bfaf1 100644 --- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java @@ -12,9 +12,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; -import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF; +import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF; import java.util.ArrayList; import java.util.HashMap; @@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.util.ConnectorUtils; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; public class GeodeKafkaSource extends SourceConnector { diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java similarity index 98% rename from src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java index e875ee4..1d16404 100644 --- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import java.util.concurrent.TimeUnit; diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java similarity index 98% rename from src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java index 4acc081..58de173 100644 --- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import java.util.ArrayList; import java.util.Collection; @@ -23,7 +23,7 @@ import java.util.stream.Collectors; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; -import org.geode.kafka.GeodeContext; +import org.apache.geode.kafka.GeodeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java similarity index 98% rename from src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java rename to src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java index e96796b..339551a 100644 --- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -12,14 +12,14 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { diff --git a/src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java similarity index 98% rename from src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java rename to src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java index 963a132..b3d1268 100644 --- a/src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java +++ b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java similarity index 85% rename from src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java rename to src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java index b0a11e6..5cd3618 100644 --- a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java @@ -1,14 +1,14 @@ -package org.geode.kafka; +package org.apache.geode.kafka; import static org.awaitility.Awaitility.await; -import static org.geode.kafka.GeodeKafkaTestUtils.createProducer; -import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; -import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; -import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig; -import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties; -import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; -import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; -import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createProducer; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper; import static org.junit.Assert.assertEquals; import java.util.Arrays; @@ -31,6 +31,8 @@ import org.junit.runners.Parameterized; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.kafka.utilities.KafkaLocalCluster; +import org.apache.geode.kafka.utilities.WorkerAndHerderCluster; import org.apache.geode.test.dunit.rules.ClientVM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; import org.apache.geode.test.dunit.rules.MemberVM; diff --git a/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java similarity index 85% rename from src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java rename to src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java index d300b55..08aede2 100644 --- a/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java @@ -12,17 +12,17 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; - -import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer; -import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; -import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; -import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig; -import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties; -import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; -import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; -import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; -import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed; +package org.apache.geode.kafka; + +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createConsumer; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper; +import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.verifyEventsAreConsumed; import java.util.Arrays; @@ -43,6 +43,9 @@ import org.junit.runners.Parameterized.Parameters; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.kafka.utilities.GeodeKafkaTestUtils; +import org.apache.geode.kafka.utilities.KafkaLocalCluster; +import org.apache.geode.kafka.utilities.WorkerAndHerderCluster; import org.apache.geode.test.dunit.rules.ClientVM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; import org.apache.geode.test.dunit.rules.MemberVM; diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java similarity index 96% rename from src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java rename to src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java index 5c63d98..db7d921 100644 --- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java @@ -12,11 +12,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka; -import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT; -import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; @@ -137,7 +135,7 @@ public class GeodeConnectorConfigTest { @Test public void usesSecurityShouldBeTrueIfSecurityUserSet() { Map<String, String> props = new HashMap<>(); - props.put(SECURITY_USER, "some user"); + props.put(GeodeConnectorConfig.SECURITY_USER, "some user"); GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertTrue(config.usesSecurity()); @@ -146,7 +144,7 @@ public class GeodeConnectorConfigTest { @Test public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() { Map<String, String> props = new HashMap<>(); - props.put(SECURITY_CLIENT_AUTH_INIT, "someclass"); + props.put(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, "someclass"); GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertTrue(config.usesSecurity()); @@ -163,7 +161,7 @@ public class GeodeConnectorConfigTest { @Test public void securityClientAuthInitShouldBeSetIfUserIsSet() { Map<String, String> props = new HashMap<>(); - props.put(SECURITY_USER, "some user"); + props.put(GeodeConnectorConfig.SECURITY_USER, "some user"); GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertNotNull(config.getSecurityClientAuthInit()); diff --git a/src/test/java/org/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java similarity index 99% rename from src/test/java/org/geode/kafka/sink/BatchRecordsTest.java rename to src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java index c2da554..f59ab7b 100644 --- a/src/test/java/org/geode/kafka/sink/BatchRecordsTest.java +++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; +package org.apache.geode.kafka.sink; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java similarity index 97% rename from src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java rename to src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java index 32925fb..dd325f2 100644 --- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java +++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; +package org.apache.geode.kafka.sink; import static org.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE; import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS; @@ -28,7 +28,7 @@ import java.util.HashMap; import java.util.List; import org.apache.kafka.connect.sink.SinkRecord; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; import org.junit.Test; import org.apache.geode.cache.Region; diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java similarity index 93% rename from src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java rename to src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java index 28e7033..ea5cf1e 100644 --- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java +++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java @@ -12,9 +12,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.sink; +package org.apache.geode.kafka.sink; -import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS; +import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -23,7 +23,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; import org.junit.Test; public class GeodeKafkaSinkTest { diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java similarity index 97% rename from src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java rename to src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 4fa7d81..5125a91 100644 --- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -12,10 +12,10 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; -import static org.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX; -import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION; +import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX; +import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -35,7 +35,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import org.geode.kafka.GeodeContext; +import org.apache.geode.kafka.GeodeContext; import org.junit.Test; import org.apache.geode.cache.client.ClientCache; diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java similarity index 92% rename from src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java rename to src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java index 433550a..786fccf 100644 --- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java +++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java @@ -12,9 +12,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; -import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; +import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -23,7 +23,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; import org.junit.Test; public class GeodeKafkaSourceTest { diff --git a/src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java similarity index 87% rename from src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java rename to src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java index fdcd7d3..641a2ac 100644 --- a/src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java +++ b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java @@ -12,15 +12,15 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; -import static org.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX; import static org.junit.Assert.assertEquals; import java.util.HashMap; import java.util.Map; -import org.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.GeodeConnectorConfig; import org.junit.Test; public class GeodeSourceConnectorConfigTest { diff --git a/src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java b/src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java similarity index 98% rename from src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java rename to src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java index 92de30d..2683a61 100644 --- a/src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java +++ b/src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka.source; +package org.apache.geode.kafka.source; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java similarity index 82% rename from src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java rename to src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java index abe0d6a..c8e1bb0 100644 --- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java +++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka.utilities; import static org.awaitility.Awaitility.await; @@ -42,21 +42,21 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.junit.rules.TemporaryFolder; public class GeodeKafkaTestUtils { - protected static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties) + public static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties) throws IOException, QuorumPeerConfig.ConfigException { ZooKeeperLocalCluster zooKeeperLocalCluster = new ZooKeeperLocalCluster(zookeeperProperties); zooKeeperLocalCluster.start(); return zooKeeperLocalCluster; } - protected static KafkaLocalCluster startKafka(Properties kafkaProperties) + public static KafkaLocalCluster startKafka(Properties kafkaProperties) throws IOException, InterruptedException { KafkaLocalCluster kafkaLocalCluster = new KafkaLocalCluster(kafkaProperties); kafkaLocalCluster.start(); return kafkaLocalCluster; } - protected static void createTopic(String topicName, int numPartitions, int replicationFactor) { + public static void createTopic(String topicName, int numPartitions, int replicationFactor) { KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); @@ -67,14 +67,14 @@ public class GeodeKafkaTestUtils { RackAwareMode.Disabled$.MODULE$); } - protected static void deleteTopic(String topicName) { + public static void deleteTopic(String topicName) { KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); AdminZkClient adminZkClient = new AdminZkClient(zkClient); adminZkClient.deleteTopic(topicName); } - protected static Producer<String, String> createProducer() { + public static Producer<String, String> createProducer() { final Properties props = new Properties(); props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -88,7 +88,7 @@ public class GeodeKafkaTestUtils { return producer; } - protected static Properties getZooKeeperProperties(TemporaryFolder temporaryFolder) + public static Properties getZooKeeperProperties(TemporaryFolder temporaryFolder) throws IOException { Properties properties = new Properties(); properties.setProperty("dataDir", temporaryFolder.newFolder("zookeeper").getAbsolutePath()); @@ -97,7 +97,7 @@ public class GeodeKafkaTestUtils { return properties; } - protected static Properties getKafkaConfig(String logPath) { + public static Properties getKafkaConfig(String logPath) { int BROKER_PORT = 9092; Properties props = new Properties(); props.put("broker.id", "0"); @@ -129,9 +129,13 @@ public class GeodeKafkaTestUtils { return consumer; } - protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks, - String sourceRegion, String sinkRegion, String sourceTopic, String sinkTopic, - String offsetPath, String locatorString) { + public static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks, + String sourceRegion, + String sinkRegion, + String sourceTopic, + String sinkTopic, + String offsetPath, + String locatorString) { WorkerAndHerderCluster workerAndHerderCluster = new WorkerAndHerderCluster(); try { workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion, sinkRegion, sourceTopic, @@ -143,7 +147,7 @@ public class GeodeKafkaTestUtils { return workerAndHerderCluster; } - protected static void verifyEventsAreConsumed(Consumer<String, String> consumer, int numEvents) { + public static void verifyEventsAreConsumed(Consumer<String, String> consumer, int numEvents) { AtomicInteger valueReceived = new AtomicInteger(0); await().atMost(10, TimeUnit.SECONDS).until(() -> { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); diff --git a/src/test/java/org/geode/kafka/JavaProcess.java b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java similarity index 98% rename from src/test/java/org/geode/kafka/JavaProcess.java rename to src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java index a88638b..c289c80 100644 --- a/src/test/java/org/geode/kafka/JavaProcess.java +++ b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka.utilities; import java.io.File; import java.io.IOException; diff --git a/src/test/java/org/geode/kafka/KafkaLocalCluster.java b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java similarity index 97% rename from src/test/java/org/geode/kafka/KafkaLocalCluster.java rename to src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java index ee13f8c..338e819 100644 --- a/src/test/java/org/geode/kafka/KafkaLocalCluster.java +++ b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka.utilities; import java.io.IOException; import java.util.Properties; diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java similarity index 97% rename from src/test/java/org/geode/kafka/WorkerAndHerderCluster.java rename to src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java index 022e381..7c58bc0 100644 --- a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java +++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka.utilities; import java.io.IOException; diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java similarity index 79% rename from src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java rename to src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java index a3efc23..8003fc0 100644 --- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java @@ -12,10 +12,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka.utilities; -import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS; -import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; +import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; import java.io.IOException; import java.util.HashMap; @@ -32,29 +31,27 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.apache.kafka.connect.util.ConnectUtils; -import org.geode.kafka.sink.GeodeKafkaSink; -import org.geode.kafka.source.GeodeKafkaSource; + +import org.apache.geode.kafka.GeodeConnectorConfig; +import org.apache.geode.kafka.sink.GeodeKafkaSink; +import org.apache.geode.kafka.source.GeodeKafkaSource; +import org.apache.geode.kafka.sink.GeodeSinkConnectorConfig; public class WorkerAndHerderWrapper { public static void main(String[] args) throws IOException { - String maxTasks = args[0]; - String offsetPath = "/tmp/connect.offsets"; - String regionToTopicBinding = GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS; - String topicToRegionBinding = GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS; - String sinkTopic = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; - String locatorString = null; - System.out.println("MaxTask " + maxTasks); - if (args.length == 7) { - String sourceRegion = args[1]; - String sinkRegion = args[2]; - String sourceTopic = args[3]; - sinkTopic = args[4]; - offsetPath = args[5]; - regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]"; - topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]"; - locatorString = args[6]; + if (args.length != 7) { + throw new RuntimeException("Insufficient arguments to start workers and herders"); } + String maxTasks = args[0]; + String sourceRegion = args[1]; + String sinkRegion = args[2]; + String sourceTopic = args[3]; + String sinkTopic = args[4]; + String offsetPath = args[5]; + String regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]"; + String topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]"; + String locatorString = args[6]; Map props = new HashMap(); props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); @@ -100,7 +97,7 @@ public class WorkerAndHerderWrapper { sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName()); sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector"); sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks); - sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding); + sinkProps.put(GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS, topicToRegionBinding); sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString); sinkProps.put("topics", sinkTopic); diff --git a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java similarity index 98% rename from src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java rename to src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java index d7cb99a..8e5e7a9 100644 --- a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java +++ b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.geode.kafka; +package org.apache.geode.kafka.utilities; import java.io.IOException; import java.util.Properties; diff --git a/src/test/java/org/geode/kafka/GeodeContextTest.java b/src/test/java/org/geode/kafka/GeodeContextTest.java deleted file mode 100644 index eb10bee..0000000 --- a/src/test/java/org/geode/kafka/GeodeContextTest.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.geode.kafka; - -public class GeodeContextTest { -} diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java deleted file mode 100644 index 57d576d..0000000 --- a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.geode.kafka; - -import static org.awaitility.Awaitility.await; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import kafka.admin.RackAwareMode; -import kafka.zk.AdminZkClient; -import kafka.zk.KafkaZkClient; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import org.apache.geode.cache.Region; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.client.ClientRegionShortcut; - -public class GeodeKafkaTestCluster { - - @ClassRule - public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static boolean debug = true; - - public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]"; - public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]"; - - public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource"; - public static String TEST_REGION_FOR_SOURCE = "someRegionForSource"; - public static String TEST_TOPIC_FOR_SINK = "someTopicForSink"; - public static String TEST_REGION_FOR_SINK = "someRegionForSink"; - - private static ZooKeeperLocalCluster zooKeeperLocalCluster; - private static KafkaLocalCluster kafkaLocalCluster; - private static GeodeLocalCluster geodeLocalCluster; - private static WorkerAndHerderCluster workerAndHerderCluster; - private static Consumer<String, String> consumer; - - @BeforeClass - public static void setup() - throws IOException, QuorumPeerConfig.ConfigException, InterruptedException { - startZooKeeper(); - startKafka(); - startGeode(); - } - - - @AfterClass - public static void shutdown() { - workerAndHerderCluster.stop(); - KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, - 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); - // AdminZkClient adminZkClient = new AdminZkClient(zkClient); - // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE); - // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK); - zkClient.close(); - kafkaLocalCluster.stop(); - geodeLocalCluster.stop(); - } - - - private static void startWorker(int maxTasks) throws IOException, InterruptedException { - workerAndHerderCluster = new WorkerAndHerderCluster(); - workerAndHerderCluster.start(String.valueOf(maxTasks)); - Thread.sleep(20000); - } - - private static void createTopic(String topicName, int numPartitions, int replicationFactor) { - KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, - 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); - - Properties topicProperties = new Properties(); - topicProperties.put("flush.messages", "1"); - AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.createTopic(topicName, numPartitions, replicationFactor, topicProperties, - RackAwareMode.Disabled$.MODULE$); - } - - private static void deleteTopic(String topicName) { - KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, - 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); - AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.deleteTopic(topicName); - } - - private ClientCache createGeodeClient() { - return new ClientCacheFactory().addPoolLocator("localhost", 10334).create(); - } - - private static void startZooKeeper() throws IOException, QuorumPeerConfig.ConfigException { - zooKeeperLocalCluster = new ZooKeeperLocalCluster(getZooKeeperProperties()); - zooKeeperLocalCluster.start(); - } - - private static void startKafka() - throws IOException, InterruptedException, QuorumPeerConfig.ConfigException { - kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig()); - kafkaLocalCluster.start(); - } - - private static void startGeode() throws IOException, InterruptedException { - geodeLocalCluster = new GeodeLocalCluster(); - geodeLocalCluster.start(); - } - - private static Properties getZooKeeperProperties() throws IOException { - Properties properties = new Properties(); - properties.setProperty("dataDir", - (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath()); - properties.setProperty("clientPort", "2181"); - properties.setProperty("tickTime", "2000"); - return properties; - } - - - private static Properties getKafkaConfig() throws IOException { - int BROKER_PORT = 9092; - Properties props = new Properties(); - - props.put("broker.id", "0"); - props.put("zookeeper.connect", "localhost:2181"); - props.put("host.name", "localHost"); - props.put("port", BROKER_PORT); - props.put("offsets.topic.replication.factor", "1"); - - // Specifically GeodeKafka connector configs - return props; - } - - - // consumer props, less important, just for testing? - public static Consumer<String, String> createConsumer() { - final Properties props = new Properties(); - props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, - "myGroup"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - // Create the consumer using props. - final Consumer<String, String> consumer = - new KafkaConsumer<>(props); - // Subscribe to the topic. - consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE)); - return consumer; - } - - // consumer props, less important, just for testing? - public static Producer<String, String> createProducer() { - final Properties props = new Properties(); - props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getName()); - - // Create the producer using props. - final Producer<String, String> producer = - new KafkaProducer<>(props); - return producer; - } - - @Test - public void endToEndSourceTest() throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SOURCE, 1, 1); - startWorker(1); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY) - .create(TEST_REGION_FOR_SOURCE); - - for (int i = 0; i < 10; i++) { - region.put("KEY" + i, "VALUE" + i); - } - - AtomicInteger valueReceived = new AtomicInteger(0); - await().atMost(10, TimeUnit.SECONDS).until(() -> { - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord<String, String> record : records) { - valueReceived.incrementAndGet(); - } - return valueReceived.get() == 10; - }); - } finally { - deleteTopic(TEST_TOPIC_FOR_SOURCE); - } - } - - - @Test - public void endToEndSourceSingleRegionMultiTaskMultiPartitionTest() throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1); - startWorker(1); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY) - .create(TEST_REGION_FOR_SOURCE); - - for (int i = 0; i < 10; i++) { - region.put("KEY" + i, "VALUE" + i); - } - - AtomicInteger valueReceived = new AtomicInteger(0); - await().atMost(10, TimeUnit.SECONDS).until(() -> { - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord<String, String> record : records) { - valueReceived.incrementAndGet(); - } - return valueReceived.get() == 10; - }); - } finally { - deleteTopic(TEST_TOPIC_FOR_SOURCE); - } - } - - @Test - public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest() - throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1); - startWorker(5); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY) - .create(TEST_REGION_FOR_SOURCE); - - for (int i = 0; i < 10; i++) { - region.put("KEY" + i, "VALUE" + i); - } - - AtomicInteger valueReceived = new AtomicInteger(0); - await().atMost(10, TimeUnit.SECONDS).until(() -> { - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord<String, String> record : records) { - valueReceived.incrementAndGet(); - } - return valueReceived.get() == 10; - }); - } finally { - deleteTopic(TEST_TOPIC_FOR_SOURCE); - } - } - - @Test - public void endToEndSinkTest() throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SINK, 1, 1); - startWorker(1); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = - client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); - - Producer<String, String> producer = createProducer(); - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); - } - - int i = 0; - await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); - } finally { - deleteTopic(TEST_TOPIC_FOR_SINK); - } - } - - - @Test - public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest() - throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SINK, 10, 1); - startWorker(5); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = - client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); - - Producer<String, String> producer = createProducer(); - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); - } - - int i = 0; - await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); - } finally { - deleteTopic(TEST_TOPIC_FOR_SINK); - } - } - - @Test - public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest() - throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SINK, 10, 1); - startWorker(15); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = - client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); - - Producer<String, String> producer = createProducer(); - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); - } - - int i = 0; - await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); - } finally { - deleteTopic(TEST_TOPIC_FOR_SINK); - } - } - - @Test - public void endToEndWithOneTaskForASingleBindingLessTasksThanPartitions() throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SINK, 10, 1); - startWorker(5); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = - client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); - - Producer<String, String> producer = createProducer(); - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); - } - - int i = 0; - await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); - } finally { - deleteTopic(TEST_TOPIC_FOR_SINK); - } - } - - @Test - public void endToEndWithOneTaskForASingleBindingMoreTasksThanPartitions() throws Exception { - try { - createTopic(TEST_TOPIC_FOR_SINK, 10, 1); - startWorker(5); - consumer = createConsumer(); - - ClientCache client = createGeodeClient(); - Region region = - client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); - - Producer<String, String> producer = createProducer(); - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(), - UUID.randomUUID().toString())); - } - - int i = 0; - await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); - } finally { - deleteTopic(TEST_TOPIC_FOR_SINK); - } - } - -} diff --git a/src/test/java/org/geode/kafka/GeodeLocalCluster.java b/src/test/java/org/geode/kafka/GeodeLocalCluster.java deleted file mode 100644 index 6784391..0000000 --- a/src/test/java/org/geode/kafka/GeodeLocalCluster.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.geode.kafka; - -import java.io.IOException; - -public class GeodeLocalCluster { - - private JavaProcess locatorProcess; - private JavaProcess serverProcess; - - public GeodeLocalCluster() { - locatorProcess = new JavaProcess(LocatorLauncherWrapper.class); - serverProcess = new JavaProcess(ServerLauncherWrapper.class); - } - - public void start() throws IOException, InterruptedException { - System.out.println("starting locator"); - locatorProcess.exec("10334"); - Thread.sleep(15000); - serverProcess.exec("40404"); - Thread.sleep(30000); - } - - public void stop() { - serverProcess.destroy(); - locatorProcess.destroy(); - } -} diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java deleted file mode 100644 index f24367c..0000000 --- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.geode.kafka; - -import java.io.IOException; -import java.util.Properties; - -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.Locator; - -public class LocatorLauncherWrapper { - - public static void main(String[] args) throws IOException { - Properties properties = new Properties(); - // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath(); - // properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile); - properties.setProperty(ConfigurationProperties.NAME, "locator1"); - - Locator.startLocatorAndDS(10334, - null/* new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log") */, properties); - while (true) { - - } - // - // LocatorLauncher locatorLauncher = new LocatorLauncher.Builder() - // .setMemberName("locator1") - //// .setPort(Integer.valueOf(args[0])) - //// .setBindAddress("localhost") - // .build(); - // - // locatorLauncher.start(); - // while (!locatorLauncher.isRunning()) { - // - // } - // System.out.println(locatorLauncher.getBindAddress() + ":" + locatorLauncher.getPort()); - - } -} diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java deleted file mode 100644 index 4ab75cd..0000000 --- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.geode.kafka; - -import java.io.IOException; -import java.util.Properties; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.distributed.ConfigurationProperties; - -public class ServerLauncherWrapper { - - public static void main(String... args) throws IOException { - // ServerLauncher serverLauncher = new ServerLauncher.Builder() - // .setMemberName("server1") - //// .setServerPort(Integer.valueOf(args[0])) - //// .setServerBindAddress("localhost") - // // .set("locators", "localhost[10334]") - //// .set("jmx-manager", "true") - //// .set("jmx-manager-start", "true") - // .build(); - // - // serverLauncher.start(); - // System.out.println("Geode Server Launcher complete"); - - - - Properties properties = new Properties(); - String locatorString = "localhost[10334]"; - // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath(); - Cache cache = new CacheFactory(properties) - // .setPdxSerializer(new ReflectionBasedAutoSerializer("benchmark.geode.data.*")) - .set(ConfigurationProperties.LOCATORS, locatorString) - .set(ConfigurationProperties.NAME, - "server-1") - // .set(ConfigurationProperties.LOG_FILE, - // "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log") - .set(ConfigurationProperties.LOG_LEVEL, "info") - // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile) - .create(); - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(0); - cacheServer.start(); - - // create the region - cache.createRegionFactory(RegionShortcut.PARTITION).create( - GeodeKafkaTestCluster.TEST_REGION_FOR_SINK); - cache.createRegionFactory(RegionShortcut.PARTITION).create( - GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE); - System.out.println("starting cacheserver"); - while (true) { - - } - } -}