http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java new file mode 100755 index 0000000..e759ecc --- /dev/null +++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java @@ -0,0 +1,488 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.messaging.kafka; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.atlas.odf.api.OpenDiscoveryFramework; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.engine.KafkaGroupOffsetInfo; +import org.apache.atlas.odf.api.engine.KafkaStatus; +import org.apache.atlas.odf.api.engine.KafkaTopicStatus; +import org.apache.atlas.odf.api.engine.ThreadStatus; +import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; +import org.apache.atlas.odf.core.notification.NotificationListener; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.engine.MessagingStatus; +import org.apache.atlas.odf.api.engine.PartitionOffsetInfo; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.controlcenter.AdminMessage; +import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor; +import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor; +import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore.StatusQueueProcessor; +import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter; +import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory; +import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntime; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes; +import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult; +import org.apache.atlas.odf.core.controlcenter.TrackerUtil; +import org.apache.atlas.odf.core.notification.NotificationManager; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.common.TopicExistsException; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +public class KafkaQueueManager implements DiscoveryServiceQueueManager { + + public static final String TOPIC_NAME_STATUS_QUEUE = "odf-status-topic"; + public static final String TOPIC_NAME_ADMIN_QUEUE = "odf-admin-topic"; + public static final String ADMIN_QUEUE_KEY = "odf-admin-queue-key"; + public static final String SERVICE_TOPIC_PREFIX = "odf-topic-"; + + public static final RackAwareMode DEFAULT_RACK_AWARE_MODE = RackAwareMode.Disabled$.MODULE$; + + //use static UUID so that no unnecessary consumer threads are started + private final static String UNIQUE_SESSION_THREAD_ID = UUID.randomUUID().toString(); + + private final static int THREAD_STARTUP_TIMEOUT_MS = 5000; + + private static List<String> queueConsumerNames = null; + private static Object startLock = new Object(); + + private final static Logger logger = Logger.getLogger(KafkaQueueManager.class.getName()); + + private ThreadManager threadManager; + private SettingsManager odfConfig; + private String zookeeperConnectString; + + public KafkaQueueManager() { + ODFInternalFactory factory = new ODFInternalFactory(); + threadManager = factory.create(ThreadManager.class); + ExecutorServiceFactory esf = factory.create(ExecutorServiceFactory.class); + threadManager.setExecutorService(esf.createExecutorService()); + zookeeperConnectString = factory.create(Environment.class).getZookeeperConnectString(); + odfConfig = factory.create(SettingsManager.class); + } + + + public Properties getConsumerConfigProperties(String consumerGroupID, boolean consumeFromEnd) { + Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties(); + kafkaConsumerProps.put("group.id", consumerGroupID); + if (zookeeperConnectString != null) { + kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString); + } + if (consumeFromEnd) { + kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + } else { + kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); + kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + return kafkaConsumerProps; + } + + private String getBootstrapServers() { + final List<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString); + StringBuilder servers = new StringBuilder(); + final Iterator<String> iterator = brokers.iterator(); + while(iterator.hasNext()){ + servers.append(iterator.next()); + if(iterator.hasNext()){ + servers.append(","); + } + } + return servers.toString(); + } + + protected void createTopicIfNotExists(String topicName, int partitionCount, Properties props) { + String zkHosts = props.getProperty("zookeeper.connect"); + ZkClient zkClient = null; + try { + zkClient = new ZkClient(zkHosts, Integer.valueOf(props.getProperty("zookeeperSessionTimeoutMs")), + Integer.valueOf(props.getProperty("zookeeperConnectionTimeoutMs")), ZKStringSerializer$.MODULE$); + } catch (ZkTimeoutException zkte) { + logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts); + } + try { + logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName); + // using partition size 1 and replication size 1, no special + // per-topic config needed + try { + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts), false); + if (!AdminUtils.topicExists(zkUtils, topicName)) { + logger.log(Level.INFO, "Topic ''{0}'' does not exist, creating it", topicName); + + //FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts? + KafkaMessagingConfiguration kafkaConfig = ((KafkaMessagingConfiguration) odfConfig.getODFSettings().getMessagingConfiguration()); + AdminUtils.createTopic(zkUtils, topicName, partitionCount, kafkaConfig.getKafkaBrokerTopicReplication(), + new Properties(), DEFAULT_RACK_AWARE_MODE); + logger.log(Level.FINE, "Topic ''{0}'' created", topicName); + //wait before continuing to make sure the topic exists BEFORE consumers are started + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } catch (TopicExistsException ex) { + logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName); + } + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + } + + + private String getTopicName(ServiceRuntime runtime) { + return "odf-runtime-" + runtime.getName(); + } + + private String getConsumerGroup(ServiceRuntime runtime) { + return getTopicName(runtime) + "_group"; + } + + private List<ThreadStartupResult> scheduleAllRuntimeConsumers() { + List<ThreadStartupResult> results = new ArrayList<>(); + for (ServiceRuntime runtime : ServiceRuntimes.getActiveRuntimes()) { + results.addAll(scheduleRuntimeConsumers(runtime)); + } + return results; + } + + private List<ThreadStartupResult> scheduleRuntimeConsumers(ServiceRuntime runtime) { + logger.log(Level.FINER, "Create consumers on queue for runtime ''{0}'' if it doesn't already exist", runtime.getName()); + + String topicName = getTopicName(runtime); + String consumerGroupId = getConsumerGroup(runtime); + Properties kafkaConsumerProps = getConsumerConfigProperties(consumerGroupId, false); // read entries from beginning if consumer was never initialized + String threadName = "RuntimeQueueConsumer" + topicName; + List<ThreadStartupResult> result = new ArrayList<ThreadStartupResult>(); + if (threadManager.getStateOfUnmanagedThread(threadName) != ThreadStatus.ThreadState.RUNNING) { + createTopicIfNotExists(topicName, 1, kafkaConsumerProps); + ThreadStartupResult startupResult = threadManager.startUnmanagedThread(threadName, new KafkaRuntimeConsumer(runtime, topicName, kafkaConsumerProps, new DiscoveryServiceStarter())); + result.add(startupResult); + } else { + result.add(new ThreadStartupResult(threadName) { + @Override + public boolean isNewThreadCreated() { + return false; + } + + @Override + public boolean isReady() { + return true; + } + }); + } + return result; + } + + + private List<ThreadStartupResult> scheduleConsumerThreads(String topicName, int partitionCount, Properties kafkaConsumerProps, String threadName, + List<QueueMessageProcessor> processors) { + if (processors.size() != partitionCount) { + final String msg = "The number of processors must be equal to the partition count in order to support parallel processing"; + logger.warning(msg); + throw new RuntimeException(msg); + } + createTopicIfNotExists(topicName, partitionCount, kafkaConsumerProps); + + List<ThreadStartupResult> result = new ArrayList<ThreadStartupResult>(); + for (int no = 0; no < partitionCount; no++) { + if (threadManager.getStateOfUnmanagedThread(threadName + "_" + no) != ThreadStatus.ThreadState.RUNNING) { + QueueMessageProcessor processor = processors.get(no); + ThreadStartupResult created = threadManager.startUnmanagedThread(threadName + "_" + no, new KafkaQueueConsumer(topicName, kafkaConsumerProps, processor)); + if (created.isNewThreadCreated()) { + logger.log(Level.INFO, "Created new consumer thread on topic ''{0}'' with group ID ''{1}'', thread name: ''{2}'', properties: ''{3}''", + new Object[] { topicName, kafkaConsumerProps.getProperty("group.id"), threadName + "_" + no, kafkaConsumerProps.toString() }); + } else { + logger.log(Level.FINE, "Consumer thread with thread name: ''{0}'' already exists, doing nothing", new Object[] { threadName + "_" + no }); + } + result.add(created); + } else { + result.add(new ThreadStartupResult(threadName) { + @Override + public boolean isNewThreadCreated() { + return false; + } + + @Override + public boolean isReady() { + return true; + } + }); + } + } + return result; + } + + private ThreadStartupResult scheduleConsumerThread(String topicName, Properties kafkaConsumerProps, String threadName, QueueMessageProcessor processor) { + return scheduleConsumerThreads(topicName, 1, kafkaConsumerProps, threadName, Arrays.asList(processor)).get(0); + } + + @Override + public void enqueue(AnalysisRequestTracker tracker) { + DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker); + if (dsRequest == null) { + throw new RuntimeException("Tracker is finished, should not be enqueued"); + } + String dsID = dsRequest.getDiscoveryServiceId(); + dsRequest.setPutOnRequestQueue(System.currentTimeMillis()); + ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(dsID); + if (runtime == null) { + throw new RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was not found.", dsID)); + } + enqueueJSONMessage(getTopicName(runtime), tracker, tracker.getRequest().getId()); + } + + private void enqueueJSONMessage(String topicName, Object jsonObject, String key) { + String value = null; + try { + value = JSONUtils.toJSON(jsonObject); + } catch (JSONException e) { + throw new RuntimeException(e); + } + new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value); + } + + List<ThreadStartupResult> scheduleStatusQueueConsumers() { + logger.log(Level.FINER, "Create consumers on status queue if they don't already exist"); + List<ThreadStartupResult> results = new ArrayList<ThreadStartupResult>(); + + // create consumer thread for the status watcher of all trackes + String statusWatcherConsumerGroupID = "DSStatusWatcherConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id on each node that reads all from the beginning + // always read from beginning for the status queue + Properties statusWatcherKafkaConsumerProps = getConsumerConfigProperties(statusWatcherConsumerGroupID, false); + final String statusWatcherThreadName = "StatusWatcher" + TOPIC_NAME_STATUS_QUEUE; // a fixed name + String threadNameWithPartition = statusWatcherThreadName + "_0"; + final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadNameWithPartition); + logger.fine("State of status watcher thread: " + stateOfUnmanagedThread); + if (stateOfUnmanagedThread != ThreadStatus.ThreadState.RUNNING) { + final ThreadStartupResult scheduleConsumerThread = scheduleConsumerThread(TOPIC_NAME_STATUS_QUEUE, statusWatcherKafkaConsumerProps, statusWatcherThreadName, + new StatusQueueProcessor()); + results.add(scheduleConsumerThread); + } else { + results.add(new ThreadStartupResult(statusWatcherThreadName) { + @Override + public boolean isNewThreadCreated() { + return false; + } + + @Override + public boolean isReady() { + return true; + } + }); + } + + return results; + } + + + @Override + public void enqueueInStatusQueue(StatusQueueEntry sqe) { + enqueueJSONMessage(TOPIC_NAME_STATUS_QUEUE, sqe, StatusQueueEntry.getRequestId(sqe)); + } + + + private List<ThreadStartupResult> scheduleAdminQueueConsumers() { + List<ThreadStartupResult> results = new ArrayList<ThreadStartupResult>(); + //schedule admin queue consumers + // consumer group so that every node receives events + String adminWatcherConsumerGroupID = "DSAdminQueueConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id on each node + Properties adminWatcherKafkaConsumerProps = getConsumerConfigProperties(adminWatcherConsumerGroupID, true); + final String adminWatcherThreadName = "AdminWatcher" + TOPIC_NAME_ADMIN_QUEUE; + String threadNameWithPartition = adminWatcherThreadName + "_0"; + if (threadManager.getStateOfUnmanagedThread(threadNameWithPartition) != ThreadStatus.ThreadState.RUNNING) { + results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, adminWatcherKafkaConsumerProps, adminWatcherThreadName, new AdminQueueProcessor())); + // consumer group so only one node receives events + String distributedAdminConsumerGroup = "DSAdminQueueConsumerGroupCommon"; + Properties kafkaProps = getConsumerConfigProperties(distributedAdminConsumerGroup, true); + final String threadName = "DistributedAdminWatcher"; + results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, kafkaProps, threadName, new ConfigChangeQueueProcessor())); + } else { + results.add(new ThreadStartupResult(adminWatcherThreadName) { + @Override + public boolean isNewThreadCreated() { + return false; + } + + @Override + public boolean isReady() { + return true; + } + }); + } + return results; + } + + @Override + public void enqueueInAdminQueue(AdminMessage message) { + enqueueJSONMessage(TOPIC_NAME_ADMIN_QUEUE, message, ADMIN_QUEUE_KEY); + } + + @Override + public void start() throws TimeoutException { + synchronized (startLock) { + if (queueConsumerNames == null) { + List<ThreadStartupResult> results = new ArrayList<>(); + results.addAll(scheduleStatusQueueConsumers()); + results.addAll(scheduleAdminQueueConsumers()); + results.addAll(scheduleAllRuntimeConsumers()); + results.addAll(scheduleNotificationListenerThreads()); + List<String> consumerNames = new ArrayList<>(); + for (ThreadStartupResult tsr : results) { + consumerNames.add(tsr.getThreadId()); + } + queueConsumerNames = consumerNames; + this.threadManager.waitForThreadsToBeReady(THREAD_STARTUP_TIMEOUT_MS * results.size(), results); + logger.info("KafkaQueueManager successfully initialized"); + } + } + } + + public void stop() { + synchronized (startLock) { + if (queueConsumerNames != null) { + threadManager.shutdownThreads(queueConsumerNames); + queueConsumerNames = null; + } + } + } + + @Override + public MessagingStatus getMessagingStatus() { + KafkaStatus status = new KafkaStatus(); + KafkaMonitor monitor = new ODFInternalFactory().create(KafkaMonitor.class); + status.setBrokers(monitor.getBrokers(zookeeperConnectString)); + + List<String> topics = new ArrayList<String>(Arrays.asList(KafkaQueueManager.TOPIC_NAME_ADMIN_QUEUE, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE)); + for (DiscoveryServiceProperties info : new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()) { + topics.add(KafkaQueueManager.SERVICE_TOPIC_PREFIX + info.getId()); + } + + List<KafkaTopicStatus> topicStatusList = new ArrayList<KafkaTopicStatus>(); + for (String topic : topics) { + KafkaTopicStatus topicStatus = getTopicStatus(topic, monitor); + topicStatusList.add(topicStatus); + } + status.setTopicStatus(topicStatusList); + return status; + } + + private KafkaTopicStatus getTopicStatus(String topic, KafkaMonitor monitor) { + KafkaTopicStatus topicStatus = new KafkaTopicStatus(); + topicStatus.setTopic(topic); + topicStatus.setBrokerPartitionMessageInfo(monitor.getMessageCountForTopic(zookeeperConnectString, topic)); + + List<KafkaGroupOffsetInfo> offsetInfoList = new ArrayList<KafkaGroupOffsetInfo>(); + List<String> consumerGroupsFromZookeeper = monitor.getConsumerGroups(zookeeperConnectString, topic); + for (String group : consumerGroupsFromZookeeper) { + KafkaGroupOffsetInfo offsetInfoContainer = new KafkaGroupOffsetInfo(); + offsetInfoContainer.setGroupId(group); + List<PartitionOffsetInfo> offsetsForTopic = monitor.getOffsetsForTopic(zookeeperConnectString, group, topic); + for (PartitionOffsetInfo info : offsetsForTopic) { + // to reduce clutter, only if at least 1 partition has an offset > -1 (== any offset) for this consumer group, + // it will be included in the result + if (info.getOffset() > -1) { + offsetInfoContainer.setOffsets(offsetsForTopic); + offsetInfoList.add(offsetInfoContainer); + break; + } + } + } + topicStatus.setConsumerGroupOffsetInfo(offsetInfoList); + + topicStatus.setPartitionBrokersInfo(monitor.getPartitionInfoForTopic(zookeeperConnectString, topic)); + return topicStatus; + } + + private List<ThreadStartupResult> scheduleNotificationListenerThreads() { + NotificationManager nm = new ODFInternalFactory().create(NotificationManager.class); + List<NotificationListener> listeners = nm.getListeners(); + List<ThreadStartupResult> result = new ArrayList<>(); + if (listeners == null) { + return result; + } + final OpenDiscoveryFramework odf = new ODFFactory().create(); + for (final NotificationListener listener : listeners) { + String topicName = listener.getTopicName(); + String consumerGroupId = "ODFNotificationGroup" + topicName; + Properties kafkaConsumerProps = getConsumerConfigProperties(consumerGroupId, true); + String threadName = "NotificationListenerThread" + topicName; + if (threadManager.getStateOfUnmanagedThread(threadName) != ThreadStatus.ThreadState.RUNNING) { + KafkaQueueConsumer consumer = new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() { + + @Override + public void process(ExecutorService executorService, String msg, int partition, long msgOffset) { + try { + listener.onEvent(msg, odf); + } catch(Exception exc) { + String errorMsg = MessageFormat.format("Notification listsner ''{0}'' has thrown an exception. Ignoring it", listener.getName()); + logger.log(Level.WARNING, errorMsg, exc); + } + } + }); + ThreadStartupResult startupResult = threadManager.startUnmanagedThread(threadName, consumer); + result.add(startupResult); + } else { + result.add(new ThreadStartupResult(threadName) { + @Override + public boolean isNewThreadCreated() { + return false; + } + + @Override + public boolean isReady() { + return true; + } + }); + } + } + return result; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java new file mode 100755 index 0000000..73d98e7 --- /dev/null +++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java @@ -0,0 +1,104 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.messaging.kafka; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.core.controlcenter.ODFRunnable; +import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntime; + +/** + * This consumer is started for a certain runtime and starts a KafkaQueueConsumer if + * the runtime is available. + * + * + */ +public class KafkaRuntimeConsumer implements ODFRunnable { + + Logger logger = Logger.getLogger(KafkaRuntimeConsumer.class.getName()); + + private ServiceRuntime runtime; + private boolean isShutdown = false; + private ExecutorService executorService = null; + private KafkaQueueConsumer kafkaQueueConsumer = null; + + private String topic; + private Properties config; + private QueueMessageProcessor processor; + + private KafkaQueueConsumer.ConsumptionCallback callback = new KafkaQueueConsumer.ConsumptionCallback() { + @Override + public boolean stopConsumption() { + return isShutdown || (runtime.getWaitTimeUntilAvailable() > 0); + } + }; + + public KafkaRuntimeConsumer(ServiceRuntime runtime, String topicName, Properties config, QueueMessageProcessor processor) { + this.runtime = runtime; + this.processor = processor; + this.topic = topicName; + this.config = config; + } + + @Override + public void run() { + logger.log(Level.INFO, "Starting runtime consumer for topic ''{0}''", topic); + while (!isShutdown) { + long waitTime = runtime.getWaitTimeUntilAvailable(); + if (waitTime <= 0) { + logger.log(Level.INFO, "Starting Kafka consumer for topic ''{0}''", topic); + kafkaQueueConsumer = new KafkaQueueConsumer(topic, config, processor, callback); + kafkaQueueConsumer.setExecutorService(executorService); + // run consumer synchronously + kafkaQueueConsumer.run(); + logger.log(Level.INFO, "Kafka consumer for topic ''{0}'' is finished", topic); + + // if we are here, this means that the consumer was cancelled + // either directly or (more likely) through the Consumption callback + kafkaQueueConsumer = null; + } else { + try { + logger.log(Level.FINER, "Runtime ''{0}'' is not available, waiting for ''{1}''ms", new Object[]{runtime.getName(), waitTime }); + Thread.sleep(waitTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + logger.log(Level.INFO, "Kafka runtime consumer for topic ''{0}'' has shut down", topic); + } + + @Override + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public void cancel() { + isShutdown = true; + if (kafkaQueueConsumer != null) { + kafkaQueueConsumer.cancel(); + } + } + + @Override + public boolean isReady() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java new file mode 100755 index 0000000..9c08f3a --- /dev/null +++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java @@ -0,0 +1,224 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.messaging.kafka; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.controlcenter.ODFRunnable; +import org.apache.atlas.odf.api.engine.PartitionOffsetInfo; + +public class MessageSearchConsumer implements ODFRunnable { + private static final long POLLING_DURATION_MS = 100; + private static final int MAX_POLL_COUNT = 5; + + private Logger logger = Logger.getLogger(MessageSearchConsumer.class.getName()); + private SearchCompletedCallback searchCompletedCallback; + private List<String> searchStrings; + protected String topic; + private KafkaConsumer<String, String> kafkaConsumer; + private boolean shutdown; + private boolean ready = false; + private List<PartitionOffsetInfo> maxOffsetsForTopic = new ArrayList<PartitionOffsetInfo>(); + + + public MessageSearchConsumer(String topic, SearchCompletedCallback completitionCallback, List<String> searchStrings) { + setTopic(topic); + setSearchStrings(searchStrings); + setCompletitionCallback(completitionCallback); + } + + public MessageSearchConsumer() { + } + + protected List<PartitionOffsetInfo> retrieveTopicOffsets() { + List<PartitionOffsetInfo> offsetsForTopic = new ArrayList<PartitionOffsetInfo>(); + String zookeeperConnect = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + + if (zookeeperConnect != null) { + final KafkaMonitor create = new ODFInternalFactory().create(KafkaMonitor.class); + for (int part : create.getPartitionsForTopic(zookeeperConnect, this.topic)) { + offsetsForTopic.add(create.getOffsetsOfLastMessagesForTopic(zookeeperConnect, this.topic, part)); + } + } + return offsetsForTopic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setSearchStrings(List<String> searchStrings) { + this.searchStrings = searchStrings; + } + + public void setCompletitionCallback(SearchCompletedCallback completitionCallback) { + this.searchCompletedCallback = completitionCallback; + } + + protected Properties getKafkaConsumerProperties() { + Properties consumerProperties = new ODFFactory().create().getSettingsManager().getKafkaConsumerProperties(); + consumerProperties.put("group.id", UUID.randomUUID().toString() + "_searchConsumer"); + final String zookeeperConnect = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + consumerProperties.put("zookeeper.connect", zookeeperConnect); + consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnect).iterator(); + StringBuilder brokersString = new StringBuilder(); + while (brokers.hasNext()) { + brokersString.append(brokers.next()); + if (brokers.hasNext()) { + brokersString.append(","); + } + } + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString()); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + return consumerProperties; + } + + @Override + public void run() { + this.maxOffsetsForTopic = retrieveTopicOffsets(); + final String logPrefix = "Consumer for topic " + topic + ": "; + try { + + Map<Integer, Boolean> maxOffsetReachedMap = new HashMap<Integer, Boolean>(); + if (maxOffsetsForTopic.isEmpty()) { + logger.info("No offsets found for topic " + this.topic + ", therefore no matching messages can be found"); + if (searchCompletedCallback != null) { + searchCompletedCallback.onDoneSearching(new HashMap<String, PartitionOffsetInfo>()); + return; + } + } + for (PartitionOffsetInfo info : maxOffsetsForTopic) { + //if the max offset is -1, no message exists on the partition + if (info.getOffset() > -1) { + maxOffsetReachedMap.put(info.getPartitionId(), false); + } + } + + Map<String, PartitionOffsetInfo> resultMap = new HashMap<String, PartitionOffsetInfo>(); + + Properties consumerProperties = getKafkaConsumerProperties(); + + if (this.kafkaConsumer == null) { + logger.fine(logPrefix + " create new consumer for topic " + topic); + try { + this.kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties); + //In order to prevent other consumers from getting assigned this partition during a rebalance, the partition(s) MUST be assigned manually (not using auto assign because of subscribe()) + kafkaConsumer.subscribe(Arrays.asList(topic)); + } catch (ZkTimeoutException zkte) { + String zkHosts = consumerProperties.getProperty("zookeeper.connect"); + logger.log(Level.SEVERE, logPrefix + " Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts); + throw zkte; + } + } + logger.log(Level.INFO, logPrefix + " Consumer " + "''{1}'' is now listening on ODF queue ''{0}'' with configuration {2}", + new Object[] { topic, kafkaConsumer, consumerProperties }); + + int pollCount = 0; + while (!Thread.interrupted() && pollCount < MAX_POLL_COUNT && !shutdown && kafkaConsumer != null) { + logger.info("searching ..."); + pollCount++; + ConsumerRecords<String, String> records = kafkaConsumer.poll(POLLING_DURATION_MS); + ready = true; + final Iterator<ConsumerRecord<String, String>> polledRecords = records.records(topic).iterator(); + + while (polledRecords.hasNext() && !shutdown) { + final ConsumerRecord<String, String> next = polledRecords.next(); + for (String s : searchStrings) { + if ((next.key() != null && next.key().equals(s)) || (next.value() != null && next.value().contains(s))) { + final PartitionOffsetInfo position = new PartitionOffsetInfo(); + position.setOffset(next.offset()); + position.setPartitionId(next.partition()); + resultMap.put(s, position); + } + } + + if (next.offset() == maxOffsetsForTopic.get(next.partition()).getOffset()) { + maxOffsetReachedMap.put(next.partition(), true); + } + + boolean allCompleted = true; + for (Entry<Integer, Boolean> entry : maxOffsetReachedMap.entrySet()) { + if (!entry.getValue()) { + allCompleted = false; + break; + } + } + + if (allCompleted) { + logger.info("Done searching all messages"); + if (searchCompletedCallback != null) { + searchCompletedCallback.onDoneSearching(resultMap); + return; + } + shutdown = true; + } + } + } + } catch (Exception exc) { + String msg = MessageFormat.format(" Caught exception on queue ''{0}''", topic); + logger.log(Level.WARNING, logPrefix + msg, exc); + } finally { + if (kafkaConsumer != null) { + logger.log(Level.FINE, logPrefix + "Closing consumer " + " on topic ''{0}''", topic); + kafkaConsumer.close(); + logger.log(Level.FINE, logPrefix + "Closed consumer " + " on topic ''{0}''", topic); + kafkaConsumer = null; + } + } + logger.log(Level.FINE, logPrefix + "Finished consumer on topic ''{0}''", topic); + } + + @Override + public void setExecutorService(ExecutorService service) { + + } + + @Override + public void cancel() { + this.shutdown = true; + } + + @Override + public boolean isReady() { + return ready; + } + + public interface SearchCompletedCallback { + void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties new file mode 100755 index 0000000..95c1f71 --- /dev/null +++ b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +DiscoveryServiceQueueManager=org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java new file mode 100755 index 0000000..396193f --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java @@ -0,0 +1,137 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; + +import kafka.admin.AdminUtils; +import kafka.common.TopicExistsException; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +public class KafkaQueueConsumerExceptionTest extends ODFTestcase { + static Logger logger = ODFTestLogger.get(); + static final String topicName = "my_dummy_test_topic"; + static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + + @BeforeClass + public static void setupTopic() { + ZkClient zkClient = null; + try { + zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$); + logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName); + // using partition size 1 and replication size 1, no special + // per-topic config needed + logger.log(Level.FINE, "Topic ''{0}'' does not exist, creating it", topicName); + //FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts? + AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false), topicName, 1, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE); + logger.log(Level.FINE, "Topic ''{0}'' created", topicName); + } catch (TopicExistsException ex) { + logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName); + } catch (ZkTimeoutException zkte) { + logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zookeeperHost); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + } + + @Test + public void testExceptionAndRetryDuringProcessing() throws InterruptedException, ExecutionException, TimeoutException { + final ODFInternalFactory odfFactory = new ODFInternalFactory(); + final String groupId = "retrying-exception-dummy-consumer"; + Properties kafkaConsumerProperties = new KafkaQueueManager().getConsumerConfigProperties(groupId, true); + kafkaConsumerProperties.put("group.id", groupId); + final List<String> consumedMsgs1 = new ArrayList<String>(); + KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, new QueueMessageProcessor() { + + @Override + public void process(ExecutorService executorService, String msg, int partition, long offset) { + consumedMsgs1.add(msg); + logger.info("retry_consumer process " + msg + " throw exception and try again"); + throw new RuntimeException("Oops!"); + } + }); + + final ThreadManager threadManager = odfFactory.create(ThreadManager.class); + final String consumerThread = "TEST_CONSUMER_RETRY_RUNNING"; + threadManager.waitForThreadsToBeReady(10000, Arrays.asList(threadManager.startUnmanagedThread(consumerThread, cnsmr))); + + sendMsg("TEST_MSG"); + sendMsg("TEST_MSG2"); + + Thread.sleep(2000); + + Assert.assertEquals(2 * KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS, consumedMsgs1.size()); + + final ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(consumerThread); + Assert.assertEquals(ThreadState.RUNNING, stateOfUnmanagedThread); + } + + void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException { + SettingsManager odfConfig = new ODFFactory().create().getSettingsManager(); + + Properties props = odfConfig.getKafkaProducerProperties(); + final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator(); + StringBuilder brokersString = new StringBuilder(); + while (brokers.hasNext()) { + brokersString.append(brokers.next()); + if (brokers.hasNext()) { + brokersString.append(","); + } + } + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString()); + + final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); + ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg); + producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS); + producer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java new file mode 100755 index 0000000..cff538c --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java @@ -0,0 +1,303 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.settings.MessagingConfiguration; +import org.apache.atlas.odf.api.settings.ODFSettings; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.wink.json4j.JSONException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore; +import org.apache.atlas.odf.core.controlcenter.DefaultThreadManager; +import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor; +import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry; +import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult; +import org.apache.atlas.odf.core.controlcenter.TrackerUtil; +import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor; +import org.apache.atlas.odf.core.messaging.kafka.KafkaProducerManager; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.json.JSONUtils; + +public class KafkaQueueManagerTest extends ODFTestBase { + + private static Long origRetention; + Logger logger = ODFTestLogger.get(); + String zookeeperConnectString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + + @BeforeClass + public static void setupTrackerRetention() throws ValidationException { + SettingsManager settingsManager = new ODFFactory().create().getSettingsManager(); + //SETUP RETENTION TO KEEP TRACKERS!!! + final MessagingConfiguration messagingConfiguration = settingsManager.getODFSettings().getMessagingConfiguration(); + origRetention = messagingConfiguration.getAnalysisRequestRetentionMs(); + messagingConfiguration.setAnalysisRequestRetentionMs(120000000l); + + ODFTestLogger.get().info("Set request retention to " + settingsManager.getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs()); + } + + @AfterClass + public static void cleanupTrackerRetention() throws ValidationException { + SettingsManager settingsManager = new ODFFactory().create().getSettingsManager(); + ODFSettings settings = settingsManager.getODFSettings(); + settings.getMessagingConfiguration().setAnalysisRequestRetentionMs(origRetention); + settingsManager.updateODFSettings(settings); + } + + @Test + public void testStatusQueue() throws Exception { + KafkaQueueManager kqm = new KafkaQueueManager(); + + logger.info("Queue manager created"); + AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null); + + long before = System.currentTimeMillis(); + tracker.setLastModified(before); + int maxEntries = 10; + for (int i = 0; i < maxEntries; i++) { + tracker.getRequest().setId("id" + i); + StatusQueueEntry sqe = new StatusQueueEntry(); + sqe.setAnalysisRequestTracker(tracker); + kqm.enqueueInStatusQueue(sqe); + + // System.out.println("tracker "+i+" enqueued in status queue"); + } + long after = System.currentTimeMillis(); + logger.info("Time for enqueueing " + maxEntries + " objects: " + (after - before) + ", " + ((after - before) / maxEntries) + "ms per object"); + Thread.sleep(100 * maxEntries); + + AnalysisRequestTrackerStore store = new DefaultStatusQueueStore(); + + for (int i = 0; i < maxEntries; i++) { + logger.info("Querying status " + i); + AnalysisRequestTracker queriedTracker = store.query("id" + i); + Assert.assertNotNull(queriedTracker); + Assert.assertEquals(STATUS.FINISHED, queriedTracker.getStatus()); + } + + // Thread.sleep(5000); + // Assert.fail("you fail"); + logger.info("Test testEnqueueStatusQueue finished"); + } + + /** + * This test creates a tracker, puts it on the status queue, kills the service consumer and creates a new dummy consumer to put the offset of the service consumer behind the new tracker + * Then the status consumer is shut down and its offset is reset in order to make it consume from the start again and thereby cleaning up stuck processes + * Then kafka queue manager is re-initialized, causing all consumers to come up and triggering the cleanup process + */ + @Test + @Ignore("Adjust once ServiceRuntimes are fully implemented") + public void testStuckRequestCleanup() throws JSONException, InterruptedException, ExecutionException, TimeoutException { + final AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", + null); + tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE); + tracker.setNextDiscoveryServiceRequest(0); + tracker.setLastModified(System.currentTimeMillis()); + final String newTrackerId = "KAFKA_QUEUE_MANAGER_09_TEST" + UUID.randomUUID().toString(); + tracker.getRequest().setId(newTrackerId); + DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker); + final DiscoveryServiceProperties discoveryServiceRegistrationInfo = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties() + .get(0); + dsRequest.setDiscoveryServiceId(discoveryServiceRegistrationInfo.getId()); + String dsID = dsRequest.getDiscoveryServiceId(); + String topicName = KafkaQueueManager.SERVICE_TOPIC_PREFIX + dsID; + //Add tracker to queue, set offset behind request so that it should be cleanup + + String consumerGroupId = "odf-topic-" + dsID + "_group"; + String threadName = "Dummy_DiscoveryServiceQueueConsumer" + topicName; + + final List<Throwable> multiThreadErrors = new ArrayList<Throwable>(); + final DefaultThreadManager tm = new DefaultThreadManager(); + logger.info("shutdown old test 09 consumer and replace with fake doing nothing"); + for (int no = 0; no < discoveryServiceRegistrationInfo.getParallelismCount(); no++) { + tm.shutdownThreads(Collections.singletonList("DiscoveryServiceQueueConsumer" + topicName + "_" + no)); + } + Properties kafkaConsumerProps = getKafkaConsumerConfigProperties(consumerGroupId); + + final long[] producedMsgOffset = new long[1]; + + final CountDownLatch msgProcessingLatch = new CountDownLatch(1); + ThreadStartupResult created = tm.startUnmanagedThread(threadName, new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() { + + @Override + public void process(ExecutorService executorService, String msg, int partition, long msgOffset) { + logger.info("Dequeue without processing " + msgOffset); + if (msgOffset == producedMsgOffset[0]) { + try { + msgProcessingLatch.countDown(); + } catch (Exception e) { + msgProcessingLatch.countDown(); + multiThreadErrors.add(e); + } + } + } + + })); + + tm.waitForThreadsToBeReady(30000, Arrays.asList(created)); + + String key = tracker.getRequest().getId(); + String value = JSONUtils.toJSON(tracker); + + new DefaultStatusQueueStore().store(tracker); + + KafkaMonitor kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class); + List<String> origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE); + logger.info("Found status consumers: " + origQueueConsumers.toString() + ", shutting down StatusWatcher"); + + //kill status queue watcher so that it is restarted when queue manager is initialized and detects stuck requests + tm.shutdownThreads(Collections.singletonList("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0")); + + int maxWaitForConsumerDeath = 60; + while (tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.NON_EXISTENT + || tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.FINISHED && maxWaitForConsumerDeath > 0) { + maxWaitForConsumerDeath--; + Thread.sleep(500); + } + + logger.info("Only 1 consumer left? " + maxWaitForConsumerDeath + " : " + tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0")); + logger.info(" set offset for status consumer to beginning so that it consumes from when restarting"); + final int offset = 1000000; + for (String statusConsumerGroup : origQueueConsumers) { + if (statusConsumerGroup.contains("DSStatusWatcherConsumerGroup")) { + boolean success = false; + int retryCount = 0; + final int maxOffsetRetry = 20; + while (!success && retryCount < maxOffsetRetry) { + success = kafkaMonitor.setOffset(zookeeperConnectString, statusConsumerGroup, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE, 0, offset); + retryCount++; + Thread.sleep(500); + } + + Assert.assertNotEquals(retryCount, maxOffsetRetry); + Assert.assertTrue(success); + } + } + + new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value, new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + producedMsgOffset[0] = metadata.offset(); + } + }); + + final boolean await = msgProcessingLatch.await(240, TimeUnit.SECONDS); + Assert.assertTrue(await); + if (await) { + logger.info("run after message consumption..."); + AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class); + AnalysisRequestTracker storeTracker = store.query(tracker.getRequest().getId()); + Assert.assertEquals(tracker.getRequest().getId(), storeTracker.getRequest().getId()); + Assert.assertEquals(STATUS.IN_DISCOVERY_SERVICE_QUEUE, storeTracker.getStatus()); + + //start odf and cleanup here... + logger.info("shutdown all threads and restart ODF"); + tm.shutdownAllUnmanagedThreads(); + + int threadKillRetry = 0; + while (tm.getNumberOfRunningThreads() > 0 && threadKillRetry < 20) { + Thread.sleep(500); + threadKillRetry++; + } + + logger.info("All threads down, restart ODF " + threadKillRetry); + + // Initialize analysis manager + new ODFFactory().create().getAnalysisManager(); + + kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class); + origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE); + int healthRetrieveRetry = 0; + //wait for max of 40 secs for status consumer to come up. If it is, we can continue because ODF is restarted successfully + while (origQueueConsumers.isEmpty() && healthRetrieveRetry < 240) { + healthRetrieveRetry++; + Thread.sleep(500); + origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE); + } + Assert.assertNotEquals(healthRetrieveRetry, 240); + + logger.info("initialized, wait for cleanup ... " + healthRetrieveRetry); + Thread.sleep(5000); + logger.info("Found health consumers: " + origQueueConsumers.toString()); + logger.info("hopefully cleaned up ..."); + AnalysisRequestTracker storedTracker = store.query(tracker.getRequest().getId()); + Assert.assertEquals(STATUS.ERROR, storedTracker.getStatus()); + logger.info("DONE CLEANING UP, ALL FINE"); + } + + Assert.assertEquals(0, multiThreadErrors.size()); + } + + public Properties getKafkaConsumerConfigProperties(String consumerGroupID) { + SettingsManager odfConfig = new ODFFactory().create().getSettingsManager(); + Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties(); + kafkaConsumerProps.put("group.id", consumerGroupID); + if (zookeeperConnectString != null) { + kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString); + } + + kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + StringBuilder bld = new StringBuilder(); + final Iterator<String> iterator = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString).iterator(); + while (iterator.hasNext()) { + bld.append(iterator.next()); + if (iterator.hasNext()) { + bld.append(","); + } + } + kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bld.toString()); + kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + return kafkaConsumerProps; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java new file mode 100755 index 0000000..35b09e2 --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java @@ -0,0 +1,193 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.atlas.odf.core.messaging.kafka.MessageSearchConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.api.engine.PartitionOffsetInfo; +import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.apache.atlas.odf.core.test.ODFTestLogger; + +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +public class MessageSearchConsumerTest extends ODFTestBase { + private static final String TEST_SEARCH_STRING = "TEST_STRING_" + UUID.randomUUID().toString(); + private static final String TEST_SEARCH_FAILURE_STRING = "TEST_FAILURE_STRING"; + static Logger logger = ODFTestLogger.get(); + final static String topicName = "MessageSearchConsumerTest" + UUID.randomUUID().toString(); + private static final int PERFORMANCE_MSG_COUNT = 1000; + static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + private KafkaProducer<String, String> producer; + + @BeforeClass + public static void createTopc() { + ZkClient zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$); + ZkUtils utils = new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false); + if (!AdminUtils.topicExists(utils, topicName)) { + AdminUtils.createTopic(utils, topicName, 2, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE); + } + } + + @Test + public void testMsgSearchPerformance() throws InterruptedException, ExecutionException, TimeoutException { + logger.info("Producing msgs"); + for (int no = 0; no < PERFORMANCE_MSG_COUNT; no++) { + sendMsg("DUMMY_MSG" + no); + } + sendMsg(TEST_SEARCH_STRING); + logger.info("Done producing ..."); + Thread.sleep(200); + + final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class); + final CountDownLatch searchLatch = new CountDownLatch(1); + threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() { + + @Override + public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) { + logger.info("Done searching " + msgPositionMap.get(TEST_SEARCH_STRING).getOffset()); + Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1); + searchLatch.countDown(); + } + }, Arrays.asList(TEST_SEARCH_STRING))); + + boolean await = searchLatch.await(5, TimeUnit.SECONDS); + if (await) { + logger.info("Messages searched in time"); + } else { + logger.warning("Couldnt finish search in time"); + } + + final CountDownLatch failureSearchLatch = new CountDownLatch(1); + threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() { + + @Override + public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) { + logger.info("Done searching " + msgPositionMap.toString()); + Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING)); + failureSearchLatch.countDown(); + } + }, Arrays.asList(TEST_SEARCH_FAILURE_STRING))); + + await = searchLatch.await(5, TimeUnit.SECONDS); + if (await) { + logger.info("Messages searched in time"); + } else { + logger.warning("Couldnt finish search in time"); + } + } + + @Test + public void testMsgSearchSuccessAndFailure() throws InterruptedException, ExecutionException, TimeoutException { + sendMsg(TEST_SEARCH_STRING); + + Thread.sleep(200); + + final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class); + final CountDownLatch searchLatch = new CountDownLatch(1); + threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() { + + @Override + public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) { + logger.info("Done searching " + msgPositionMap.get(TEST_SEARCH_STRING).getOffset()); + Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1); + searchLatch.countDown(); + } + }, Arrays.asList(TEST_SEARCH_STRING))); + + boolean await = searchLatch.await(5, TimeUnit.SECONDS); + if (await) { + logger.info("Messages searched in time"); + } else { + logger.warning("Couldnt finish search in time"); + } + + final CountDownLatch failureSearchLatch = new CountDownLatch(1); + threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() { + + @Override + public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) { + logger.info("Done searching " + msgPositionMap); + Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING)); + failureSearchLatch.countDown(); + } + }, Arrays.asList(TEST_SEARCH_FAILURE_STRING))); + + await = searchLatch.await(5, TimeUnit.SECONDS); + if (await) { + logger.info("Messages searched in time"); + } else { + logger.warning("Couldnt finish search in time"); + } + } + + void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException { + final KafkaProducer<String, String> producer = getProducer(); + ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg); + producer.send(producerRecord).get(15000, TimeUnit.MILLISECONDS); + } + + private KafkaProducer<String, String> getProducer() { + if (this.producer == null) { + SettingsManager odfConfig = new ODFFactory().create().getSettingsManager(); + Properties props = odfConfig.getKafkaProducerProperties(); + final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator(); + StringBuilder brokersString = new StringBuilder(); + while (brokers.hasNext()) { + brokersString.append(brokers.next()); + if (brokers.hasNext()) { + brokersString.append(","); + } + } + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString()); + producer = new KafkaProducer<String, String>(props); + } + return producer; + } + + @After + public void closeProducer() { + if (getProducer() != null) { + getProducer().close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java new file mode 100755 index 0000000..f97dd4e --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java @@ -0,0 +1,314 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.atlas.odf.api.engine.ThreadStatus; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInitializer; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor; +import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer; +import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; + +import kafka.admin.AdminUtils; +import kafka.common.TopicExistsException; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +public class MultiPartitionConsumerTest extends ODFTestcase { + static Logger logger = ODFTestLogger.get(); + final static String topicName = "my_dummy_test_topic" + UUID.randomUUID().toString(); + static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + static final int PARTITION_COUNT = 3; + private static final int MSG_PER_PARTITION = 5; + private final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class); + + @BeforeClass + public static void setupTopic() { + ZkClient zkClient = null; + try { + zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$); + logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName); + // using partition size 1 and replication size 1, no special + // per-topic config needed + logger.log(Level.FINE, "Topic ''{0}'' does not exist, creating it", topicName); + //FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts? + AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false), topicName, PARTITION_COUNT, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE); + logger.log(Level.FINE, "Topic ''{0}'' created", topicName); + } catch (TopicExistsException ex) { + logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName); + } catch (ZkTimeoutException zkte) { + logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zookeeperHost); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + } + + @After + public void cleanupConsumers() { + logger.info("Cleaning up consumers..."); + logger.info("---------------------------------- Stopping ODF..."); + ODFInitializer.stop(); + logger.info("---------------------------------- Starting ODF..."); + ODFInitializer.start(); + logger.info("---------------------------------- ODF started."); + } + + @Test + public void testMultiPartitionDelayedConsumption() throws InterruptedException, ExecutionException { + Properties kafkaConsumerProperties = getConsumerProps(); + final List<String> consumedMsgs = new ArrayList<String>(); + List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>(); + + final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_"; + final int processingDelay = 2000; + for (int no = 0; no < PARTITION_COUNT; no++) { + final int currentThread = no; + final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() { + + @Override + public void process(ExecutorService executorService, String msg, int partition, long msgOffset) { + try { + Thread.sleep(processingDelay); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + consumedMsgs.add(msg); + logger.info("process " + msg + " in thread " + currentThread); + } + }; + + KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer); + + final String consumerThread = threadPrefix + no; + final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr); + startupList.add(startUnmanagedThread); + } + try { + threadManager.waitForThreadsToBeReady(30000, startupList); + for (int no = 0; no < PARTITION_COUNT; no++) { + for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) { + sendMsg("Partition " + no + " msg " + msgNo); + } + } + + int totalWait = 0; + while (totalWait < PARTITION_COUNT * MSG_PER_PARTITION * processingDelay + 10000 && consumedMsgs.size() < PARTITION_COUNT * MSG_PER_PARTITION) { + Thread.sleep(2000); + totalWait += 2000; + } + + logger.info("Done with all messages after " + totalWait); + + Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size()); + + for (int no = 0; no < PARTITION_COUNT; no++) { + final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no); + Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread); + } + } catch (TimeoutException e) { + Assert.fail("Consumer could not be started on time"); + } + } + + @Test + public void testMultiPartitionConsumption() throws InterruptedException, ExecutionException { + Properties kafkaConsumerProperties = getConsumerProps(); + final List<String> consumedMsgs = new ArrayList<String>(); + List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>(); + + final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_"; + for (int no = 0; no < PARTITION_COUNT; no++) { + final int currentThread = no; + final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() { + + @Override + public void process(ExecutorService executorService, String msg, int partition, long msgOffset) { + consumedMsgs.add(msg); + logger.info("process " + msg + " in thread " + currentThread); + } + }; + + KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer); + + final String consumerThread = threadPrefix + no; + final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr); + startupList.add(startUnmanagedThread); + } + try { + threadManager.waitForThreadsToBeReady(30000, startupList); + for (int no = 0; no < PARTITION_COUNT; no++) { + for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) { + sendMsg("Partition " + no + " msg " + msgNo); + } + } + + int totalWait = 0; + boolean done = false; + while (totalWait < 30 && !done) { + if (consumedMsgs.size() == PARTITION_COUNT * MSG_PER_PARTITION) { + done = true; + } + totalWait++; + Thread.sleep(500); + } + + Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size()); + + for (int no = 0; no < PARTITION_COUNT; no++) { + final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no); + Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread); + } + } catch (TimeoutException e) { + Assert.fail("Consumer could not be started on time"); + } + } + + @Test + public void testMultiPartitionExceptionAndRetryDuringProcessing() throws InterruptedException, ExecutionException { + Properties kafkaConsumerProperties = getConsumerProps(); + final List<String> consumedMsgs = new ArrayList<String>(); + List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>(); + + final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_"; + for (int no = 0; no < PARTITION_COUNT; no++) { + final int currentThread = no; + final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() { + + private int excCount = 0; + + @Override + public void process(ExecutorService executorService, String msg, int partition, long msgOffset) { + if (excCount < KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS - 1) { + excCount++; + logger.info("Throw exception " + excCount + " on consumer " + currentThread); + throw new RuntimeException("Forced error on consumer"); + } + consumedMsgs.add(msg); + logger.info("process " + msg + " in thread " + currentThread); + } + }; + + KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer); + + final String consumerThread = threadPrefix + no; + final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr); + startupList.add(startUnmanagedThread); + } + try { + threadManager.waitForThreadsToBeReady(30000, startupList); + for (int no = 0; no < PARTITION_COUNT; no++) { + for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) { + sendMsg("Partition " + no + " msg " + msgNo); + } + } + + int totalWait = 0; + boolean done = false; + while (totalWait < 30 && !done) { + if (consumedMsgs.size() == PARTITION_COUNT * MSG_PER_PARTITION) { + done = true; + } + totalWait++; + Thread.sleep(500); + } + Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size()); + + for (int no = 0; no < PARTITION_COUNT; no++) { + final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no); + Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread); + } + } catch (TimeoutException e) { + Assert.fail("Consumer could not be started on time"); + } + } + + private Properties getConsumerProps() { + SettingsManager odfConfig = new ODFFactory().create().getSettingsManager(); + Properties kafkaConsumerProperties = odfConfig.getKafkaConsumerProperties(); + final String groupId = "retrying-dummy-consumer"; + kafkaConsumerProperties.put("group.id", groupId); + kafkaConsumerProperties.put("zookeeper.connect", zookeeperHost); + final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator(); + StringBuilder brokersString = new StringBuilder(); + while (brokers.hasNext()) { + brokersString.append(brokers.next()); + if (brokers.hasNext()) { + brokersString.append(","); + } + } + kafkaConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString()); + kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + kafkaConsumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaConsumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + return kafkaConsumerProperties; + } + + void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException { + SettingsManager odfConfig = new ODFFactory().create().getSettingsManager(); + Properties props = odfConfig.getKafkaProducerProperties(); + final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator(); + StringBuilder brokersString = new StringBuilder(); + while (brokers.hasNext()) { + brokersString.append(brokers.next()); + if (brokers.hasNext()) { + brokersString.append(","); + } + } + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString()); + //Should we use a custom partitioner? we could try to involve consumer offsets and always put on "emptiest" partition + //props.put("partitioner.class", TestMessagePartitioner.class); + + final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); + ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg); + producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS); + producer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java new file mode 100755 index 0000000..d1c9810 --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; +import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest; + +public class ParallelServiceErrorTest extends ODFTestcase { + private static final int NUMBER_OF_QUEUED_REQUESTS = 1; + Logger log = ODFTestLogger.get(); + + @Test + public void runDataSetsInParallelError() throws Exception { + runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "errorID2" }), AnalysisRequestStatus.State.FINISHED, AnalysisRequestStatus.State.ERROR); + } + + private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, AnalysisRequestStatus.State... expectedState) throws Exception { + log.info("Running data sets in parallel: " + dataSetIDs); + log.info("Expected state: " + expectedState); + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + + List<AnalysisRequest> requests = new ArrayList<AnalysisRequest>(); + List<AnalysisResponse> responses = new ArrayList<AnalysisResponse>(); + List<String> idList = new ArrayList<String>(); + + for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) { + for (String dataSet : dataSetIDs) { + final AnalysisRequest req = ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + UUID.randomUUID().toString())); + AnalysisResponse resp = analysisManager.runAnalysis(req); + req.setId(resp.getId()); + requests.add(req); + idList.add(resp.getId()); + responses.add(resp); + } + } + log.info("Parallel requests started: " + idList.toString()); + + Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), requests.size()); + Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), responses.size()); + + // check that requests are processed in parallel: + // there must be a point in time where both requests are in status "active" + log.info("Polling for status of parallel request..."); + boolean foundPointInTimeWhereBothRequestsAreActive = false; + int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS; + List<AnalysisRequestStatus.State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>(); + do { + int foundActive = 0; + allSingleStates.clear(); + for (AnalysisRequest request : requests) { + final AnalysisRequestStatus.State state = analysisManager.getAnalysisRequestStatus(request.getId()).getState(); + if (state == AnalysisRequestStatus.State.ACTIVE) { + log.info("ACTIVE: " + request.getId() + " foundactive: " + foundActive); + foundActive++; + } else { + log.info("NOT ACTIVE " + request.getId() + " _ " + state); + } + allSingleStates.add(state); + } + if (foundActive > 1) { + foundPointInTimeWhereBothRequestsAreActive = true; + } + + maxPolls--; + Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING); + } while (maxPolls > 0 && Utils.containsNone(allSingleStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.ACTIVE, AnalysisRequestStatus.State.QUEUED })); + + Assert.assertTrue(maxPolls > 0); + Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive); + Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState))); + } +}
