This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 70328411399865005cddfdc19bd63f9fc7c443c3 Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 22 17:32:25 2020 -0800 end to end test "passes" Not sure why we dont get all the events or don't get any events until enough polls are called --- build.gradle | 7 +- src/main/java/kafka/GeodeKafkaSource.java | 42 +++- src/main/java/kafka/GeodeKafkaSourceTask.java | 270 +++++++++++++++--------- src/test/java/kafka/GeodeKafkaTestCluster.java | 90 +++----- src/test/java/kafka/LocatorLauncherWrapper.java | 3 +- src/test/java/kafka/ServerLauncherWrapper.java | 2 +- 6 files changed, 243 insertions(+), 171 deletions(-) diff --git a/build.gradle b/build.gradle index c9500b0..87a2c36 100644 --- a/build.gradle +++ b/build.gradle @@ -13,8 +13,8 @@ repositories { dependencies { - compile 'org.apache.geode:geode-core:1.11.0' - compile 'org.apache.geode:geode-cq:1.11.0' + compile 'org.apache.geode:geode-core:1.10.0' + compile 'org.apache.geode:geode-cq:1.10.0' compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1') compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0' @@ -27,4 +27,7 @@ dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' + testImplementation 'org.awaitility:awaitility:4.0.2' + + } diff --git a/src/main/java/kafka/GeodeKafkaSource.java b/src/main/java/kafka/GeodeKafkaSource.java index 8b88e81..95afc50 100644 --- a/src/main/java/kafka/GeodeKafkaSource.java +++ b/src/main/java/kafka/GeodeKafkaSource.java @@ -10,11 +10,23 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class GeodeKafkaSource extends SourceConnector { +import static kafka.GeodeConnectorConfig.BATCH_SIZE; +import static kafka.GeodeConnectorConfig.CQ_PREFIX; +import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; +import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; +import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; +import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; +import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; +import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; +import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static kafka.GeodeConnectorConfig.LOCATORS; +import static kafka.GeodeConnectorConfig.QUEUE_SIZE; +import static kafka.GeodeConnectorConfig.REGIONS; +import static kafka.GeodeConnectorConfig.TOPICS; +import static kafka.GeodeKafkaSourceTask.TASK_ID; - public static String REGION_NAME = "GEODE_REGION_NAME"; - private String regionName; - private static String TOPICS = "TOPICS"; +public class GeodeKafkaSource extends SourceConnector { private Map<String, String> sharedProps; private static final ConfigDef CONFIG_DEF = new ConfigDef(); @@ -34,9 +46,11 @@ public class GeodeKafkaSource extends SourceConnector { taskProps.putAll(sharedProps); // use the same props for all tasks at the moment - for (int i = 0; i < maxTasks; i++) + for (int i = 0; i < maxTasks; i++) { + //TODO partition regions and topics + taskProps.put(TASK_ID, "" + i); taskConfigs.add(taskProps); - + } return taskConfigs; } @@ -48,7 +62,17 @@ public class GeodeKafkaSource extends SourceConnector { @Override public void start(Map<String, String> props) { - sharedProps = props; + sharedProps = computeMissingConfigurations(props); + } + + private Map<String, String> computeMissingConfigurations(Map<String, String> props) { + props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR); + props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT); + props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID); + props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE); + props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE); + props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX); + return props; } @Override @@ -60,4 +84,8 @@ public class GeodeKafkaSource extends SourceConnector { public String version() { return AppInfoParser.getVersion(); } + + public Map<String, String> getSharedProps() { + return sharedProps; + } } diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java index 9608f07..c463e6f 100644 --- a/src/main/java/kafka/GeodeKafkaSourceTask.java +++ b/src/main/java/kafka/GeodeKafkaSourceTask.java @@ -4,134 +4,196 @@ import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqAttributesFactory; -import org.apache.geode.cache.query.CqEvent; import org.apache.geode.cache.query.CqException; import org.apache.geode.cache.query.CqExistsException; -import org.apache.geode.cache.query.CqListener; import org.apache.geode.cache.query.RegionNotFoundException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static kafka.GeodeConnectorConfig.BATCH_SIZE; +import static kafka.GeodeConnectorConfig.CQ_PREFIX; +import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; +import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; +import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; +import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static kafka.GeodeConnectorConfig.QUEUE_SIZE; +import static kafka.GeodeConnectorConfig.REGION_NAME; public class GeodeKafkaSourceTask extends SourceTask { - private static String REGION_NAME = "REGION_NAME"; - private static String OFFSET = "OFFSET"; - private static String topics[]; - private int batchSize; - private int queueSize; - private static BlockingQueue<CqEvent> eventBuffer; - private Map<String, String> sourcePartition; - private Map<String, Long> offset; - - private ClientCache clientCache; - - @Override - public String version() { - return null; - } - - @Override - public void start(Map<String, String> props) { - System.out.println("JASON task start"); - batchSize = 100; - queueSize = 100000; - String regionName = "someRegion"; - eventBuffer = new LinkedBlockingQueue<>(queueSize); - topics = new String[] {"someTopic"}; - sourcePartition = new HashMap<>(); - sourcePartition.put(REGION_NAME, regionName); - - offset = new HashMap<>(); - offset.put("OFFSET", 0L); - - installOnGeode("localHost", 10334, "someRegion"); - System.out.println("JASON task start end"); - } - - @Override - public List<SourceRecord> poll() throws InterruptedException { -// System.out.println("JASON polling"); - ArrayList<SourceRecord> records = new ArrayList<>(batchSize); - ArrayList<CqEvent> events = new ArrayList<>(batchSize); - if (eventBuffer.drainTo(events, batchSize) > 0) { - for (CqEvent event : events) { - - for (String topic : topics) - records.add(new SourceRecord(sourcePartition, offset, topic, null, event)); - } - - System.out.println("JASON we polled and returning records" + records.size()); - return records; - } -// System.out.println("JASON we didn't poll any records"); - return null; - } - - @Override - public void stop() { - clientCache.close(true); - } - - private void installOnGeode(String locatorHost, int locatorPort, String regionName) { - clientCache = new ClientCacheFactory().set("durable-client-id", "someClient") - .set("durable-client-timeout", "200") - .setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create(); - CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); - cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener()); - System.out.println("JASON installing on Geode"); - CqAttributes cqAttributes = cqAttributesFactory.create(); - try { - System.out.println("JASON installing new cq"); - clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes, - true).execute(); - System.out.println("JASON finished installing cq"); - } catch (CqExistsException e) { - System.out.println("UHH"); - e.printStackTrace(); - } catch (CqException | RegionNotFoundException e) { - System.out.println("UHH e"); - e.printStackTrace(); + //property string to pass in to identify task + public static final String TASK_ID = "GEODE_TASK_ID"; + private static final String TASK_PREFIX = "TASK"; + private static final String DOT = "."; + private static final Map<String, Long> OFFSET_DEFAULT = createOffset(); + + private int taskId; + private ClientCache clientCache; + private List<String> regionNames; + private List<String> topics; + private Map<String, Map<String, String>> sourcePartitions; + private static BlockingQueue<GeodeEvent> eventBuffer; + private int batchSize; + + + private static Map<String, Long> createOffset() { + Map<String, Long> offset = new HashMap<>(); + offset.put("OFFSET", 0L); + return offset; } - catch (Exception e) { - System.out.println("UHHHHHH " + e); + + @Override + public String version() { + return null; } - System.out.println("JASON task calling ready for events"); - clientCache.readyForEvents(); - System.out.println("JASON task ready for events"); - } - private static class GeodeKafkaSourceListener implements CqListener { + @Override + public void start(Map<String, String> props) { + try { + System.out.println("JASON task start"); + taskId = Integer.parseInt(props.get(TASK_ID)); + batchSize = Integer.parseInt(props.get(BATCH_SIZE)); + int queueSize = Integer.parseInt(props.get(QUEUE_SIZE)); + eventBuffer = new LinkedBlockingQueue<>(queueSize); + + //grouping will be done in the source and not the task + regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS)); + topics = parseNames(props.get(GeodeConnectorConfig.TOPICS)); + sourcePartitions = createSourcePartitionsMap(regionNames); + + String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX); + if (!durableClientId.equals("")) { + durableClientId += taskId; + } + System.out.println("JASON durable client id is:" + durableClientId); + String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT); + String cqPrefix = props.get(CQ_PREFIX); + + List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS)); + installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix); + System.out.println("JASON task start finished"); + } + catch (Exception e) { + System.out.println("Exception:" + e); + e.printStackTrace(); + throw e; + } + } @Override - public void onEvent(CqEvent aCqEvent) { - try { - System.out.println("JASON cqEvent and putting into eventBuffer"); - eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - - while (true) { - try { - if (!eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS)) - break; - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - System.out.println("GeodeKafkaSource Queue is full"); + public List<SourceRecord> poll() throws InterruptedException { + ArrayList<SourceRecord> records = new ArrayList<>(batchSize); + ArrayList<GeodeEvent> events = new ArrayList<>(batchSize); + if (eventBuffer.drainTo(events, batchSize) > 0) { + for (GeodeEvent event : events) { + for (String topic : topics) { + records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent())); +// records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING")); + } + } + + return records; } - } + + return null; } @Override - public void onError(CqEvent aCqEvent) { + public void stop() { + clientCache.close(true); + } + + ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) { + ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName) + .set("durable-client-timeout", durableClientTimeOut) + .setPoolSubscriptionEnabled(true); + for (LocatorHostPort locator: locators) { + ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create(); + } + return ccf.create(); + } + + void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) { + boolean isDurable = isDurable(durableClientId); + + clientCache = createClientCache(locators, durableClientId, durableClientTimeout); + for (String region : regionNames) { + installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable); + } + if (isDurable) { + clientCache.readyForEvents(); + } + } + + void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) { + CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); + cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName)); + System.out.println("JASON installing on Geode"); + CqAttributes cqAttributes = cqAttributesFactory.create(); + try { + System.out.println("JASON installing new cq"); + clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, + isDurable).execute(); + System.out.println("JASON finished installing cq"); + } catch (CqExistsException e) { + System.out.println("UHH"); + e.printStackTrace(); + } catch (CqException | RegionNotFoundException e) { + System.out.println("UHH e"); + e.printStackTrace(); + } catch (Exception e) { + System.out.println("UHHHHHH " + e); + } + } + + + List<String> parseNames(String names) { + return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList()); + } + + List<LocatorHostPort> parseLocators(String locators) { + return Arrays.stream(locators.split(",")).map((s) -> { + String locatorString = s.trim(); + return parseLocator(locatorString); + }).collect(Collectors.toList()); + } + + private LocatorHostPort parseLocator(String locatorString) { + String[] splits = locatorString.split("\\["); + String locator = splits[0]; + int port = Integer.parseInt(splits[1].replace("]", "")); + return new LocatorHostPort(locator, port); + } + + boolean isDurable(String durableClientId) { + return !durableClientId.equals(""); + } + + String generateCqName(int taskId, String cqPrefix, String regionName) { + return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName; + } + /** + * converts a list of regions names into a map of source partitions + * + * @param regionNames list of regionNames + * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name + */ + Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) { + return regionNames.stream().map(regionName -> { + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put(REGION_NAME, regionName); + return sourcePartition; + }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s)); } - } } diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java index 2b0293f..85c0fc4 100644 --- a/src/test/java/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/kafka/GeodeKafkaTestCluster.java @@ -1,11 +1,8 @@ package kafka; -import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; -import kafka.zookeeper.ZooKeeperClient; -import org.I0Itec.zkclient.ZkClient; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; @@ -16,20 +13,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; -import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.runtime.isolation.Plugins; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; -import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; -import org.apache.kafka.connect.storage.StringConverter; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -40,10 +26,11 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.time.Duration; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.awaitility.Awaitility.await; public class GeodeKafkaTestCluster { @@ -51,6 +38,9 @@ public class GeodeKafkaTestCluster { public static TemporaryFolder temporaryFolder = new TemporaryFolder(); private static boolean debug = true; + public static String TEST_TOPICS = "someTopic"; + public static String TEST_REGIONS = "someRegion"; + private static ZooKeeperLocalCluster zooKeeperLocalCluster; private static KafkaLocalCluster kafkaLocalCluster; private static GeodeLocalCluster geodeLocalCluster; @@ -63,7 +53,7 @@ public class GeodeKafkaTestCluster { startKafka(); startGeode(); createTopic(); - Thread.sleep(5000); + startWorker(); consumer = createConsumer(); Thread.sleep(5000); @@ -74,9 +64,8 @@ public class GeodeKafkaTestCluster { workerAndHerderCluster.stop(); KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); - AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.deleteTopic("someTopic"); + adminZkClient.deleteTopic(TEST_TOPICS); kafkaLocalCluster.stop(); geodeLocalCluster.stop(); @@ -92,8 +81,12 @@ public class GeodeKafkaTestCluster { private static void createTopic() { KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); + + Properties topicProperties = new Properties(); + topicProperties.put("flush.messages", "1"); AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$); + adminZkClient.createTopic(TEST_TOPICS,3 + ,1, topicProperties, RackAwareMode.Disabled$.MODULE$); } private ClientCache createGeodeClient() { @@ -125,15 +118,15 @@ public class GeodeKafkaTestCluster { private static Properties getKafkaConfig() throws IOException { - int BROKER_PORT = 8888; + int BROKER_PORT = 9092; Properties props = new Properties(); props.put("broker.id", "0"); + props.put("log4j.configuration", "/Users/jhuynh/Pivotal/kafka/config/connect-log4j.properties"); props.put("zookeeper.connect", "localhost:2181"); props.put("host.name", "localHost"); props.put("port", BROKER_PORT); props.put("offsets.topic.replication.factor", "1"); - props.put("log.dir", (debug)? "/tmp/kafka" : temporaryFolder.newFolder("kafka").getAbsolutePath()); props.put("log.flush.interval.messages", "1"); props.put("log.flush.interval.ms", "10"); @@ -144,22 +137,6 @@ public class GeodeKafkaTestCluster { props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); //Specifically GeodeKafka connector configs - - - -/* -name=file-source -# The class implementing the connector -connector.class=FileStreamSource -# Maximum number of tasks to run for this connector instance -tasks.max=1 -# The input file (path relative to worker's working directory) -# This is the only setting specific to the FileStreamSource -file=test.txt -# The output topic in Kafka -topic=connect-test - */ - return props; } @@ -167,37 +144,38 @@ topic=connect-test //consumer props, less important, just for testing? public static Consumer<String,String> createConsumer() { final Properties props = new Properties(); - props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888"); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - // Create the consumer using props. + + // Create the consumer using props. final Consumer<String, String> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. - consumer.subscribe(Collections.singletonList("someTopic")); + consumer.subscribe(Collections.singletonList(TEST_TOPICS)); return consumer; } @Test - public void testX() throws InterruptedException { + public void endToEndSourceTest() { ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion"); - region.put("JASON KEY", "JASON VALUE"); - System.out.println("PUT COMPLETE!"); - region.get("JASON KEY"); -// client.close(); - - - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord<String, String> record: records) { - System.out.println("JASON we consumed a record:" + record); - } - System.out.println("TEST COMPLETE!"); - + Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS); + + //right now just verify something makes it end to end + AtomicInteger valueReceived = new AtomicInteger(0); + await().atMost(10, TimeUnit.SECONDS).until(() -> { + region.put("KEY", "VALUE" + System.currentTimeMillis()); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4)); + for (ConsumerRecord<String, String> record: records) { +// System.out.println("WE consumed a record:" + record); + valueReceived.incrementAndGet(); + } + return valueReceived.get() > 0; + }); } } diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java b/src/test/java/kafka/LocatorLauncherWrapper.java index b4340c4..c1a7075 100644 --- a/src/test/java/kafka/LocatorLauncherWrapper.java +++ b/src/test/java/kafka/LocatorLauncherWrapper.java @@ -16,7 +16,8 @@ public class LocatorLauncherWrapper { // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath(); // properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile); properties.setProperty(ConfigurationProperties.NAME, "locator1"); - Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/"), properties); + + Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties); while (true) { } diff --git a/src/test/java/kafka/ServerLauncherWrapper.java b/src/test/java/kafka/ServerLauncherWrapper.java index 7493d6b..933824e 100644 --- a/src/test/java/kafka/ServerLauncherWrapper.java +++ b/src/test/java/kafka/ServerLauncherWrapper.java @@ -37,7 +37,7 @@ public class ServerLauncherWrapper { .set(ConfigurationProperties.LOCATORS, locatorString) .set(ConfigurationProperties.NAME, "server-1") - .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/") + .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log") .set(ConfigurationProperties.LOG_LEVEL, "info") // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile) .create();
