Repository: incubator-atlas Updated Branches: refs/heads/master 751b4c876 -> b627a681e
ATLAS-74 Create notification framework (shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b627a681 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b627a681 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b627a681 Branch: refs/heads/master Commit: b627a681edc795c71ccf13e75a79ef102e75a916 Parents: 751b4c8 Author: Shwetha GS <[email protected]> Authored: Wed Aug 5 15:21:00 2015 +0530 Committer: Shwetha GS <[email protected]> Committed: Wed Aug 5 15:21:00 2015 +0530 ---------------------------------------------------------------------- addons/hive-bridge/pom.xml | 4 - .../src/test/resources/application.properties | 64 ----- .../org/apache/atlas/ApplicationProperties.java | 10 +- client/src/main/resources/client.properties | 37 +++ notification/pom.xml | 75 +++++ .../org/apache/atlas/kafka/KafkaConsumer.java | 50 ++++ .../apache/atlas/kafka/KafkaNotification.java | 284 +++++++++++++++++++ .../notification/NotificationConsumer.java | 32 +++ .../notification/NotificationException.java | 26 ++ .../notification/NotificationHookConsumer.java | 87 ++++++ .../notification/NotificationInterface.java | 77 +++++ .../atlas/notification/NotificationModule.java | 28 ++ .../atlas/kafka/KafkaNotificationTest.java | 68 +++++ pom.xml | 29 +- release-log.txt | 1 + .../src/test/resources/application.properties | 65 ----- src/conf/application.properties | 4 + src/conf/client.properties | 10 +- .../src/main/resources/application.properties | 47 ++- typesystem/src/main/resources/log4j.xml | 26 +- webapp/src/main/java/org/apache/atlas/Main.java | 27 +- .../atlas/web/service/EmbeddedServer.java | 20 +- .../src/main/resources/application.properties | 60 ---- webapp/src/main/resources/log4j.xml | 2 +- .../java/org/apache/atlas/web/TestUtils.java | 5 + .../atlas/web/security/BaseSecurityTest.java | 10 +- .../web/service/SecureEmbeddedServerIT.java | 5 +- .../web/service/SecureEmbeddedServerITBase.java | 33 +-- 28 files changed, 906 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 66b5f74..914d8c6 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -256,10 +256,6 @@ <name>atlas.log.dir</name> <value>${project.build.directory}/logs</value> </systemProperty> - <systemProperty> - <name>atlas.conf</name> - <value>${project.build.directory}/test-classes</value> - </systemProperty> </systemProperties> <stopKey>atlas-stop</stopKey> <stopPort>41001</stopPort> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/addons/hive-bridge/src/test/resources/application.properties ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/resources/application.properties b/addons/hive-bridge/src/test/resources/application.properties deleted file mode 100644 index dda9a18..0000000 --- a/addons/hive-bridge/src/test/resources/application.properties +++ /dev/null @@ -1,64 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -######### Graph Database Configs ######### -#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html -# Graph Storage -atlas.graph.storage.backend=${titan.storage.backend} - -#Berkeley storage directory -atlas.graph.storage.directory=target/data/berkley - -#hbase -#For standalone mode , specify localhost -#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 -atlas.graph.storage.hostname=${titan.storage.hostname} - -# Graph Search Index Backend -atlas.graph.index.search.backend=${titan.index.backend} - -#lucene -#atlas.graph.index.search.directory=target/data/lucene - -#elasticsearch -atlas.graph.index.search.directory=./target/data/es -atlas.graph.index.search.elasticsearch.client-only=false -atlas.graph.index.search.elasticsearch.local-mode=true -atlas.graph.index.search.elasticsearch.create.sleep=2000 - -#solr in cloud mode -atlas.graph.index.search.solr.mode=cloud -atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} - -#solr in http mode -atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr - -######### Hive Lineage Configs ######### -#atlas.lineage.hive.table.type.name=DataSet -#atlas.lineage.hive.process.type.name=Process -#atlas.lineage.hive.process.inputs.name=inputs -#atlas.lineage.hive.process.outputs.name=outputs - -## Schema -#atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns - - -######### Security Properties ######### - -# SSL config -atlas.enableTLS=false http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/client/src/main/java/org/apache/atlas/ApplicationProperties.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/ApplicationProperties.java b/client/src/main/java/org/apache/atlas/ApplicationProperties.java index 15cca47..738ec53 100644 --- a/client/src/main/java/org/apache/atlas/ApplicationProperties.java +++ b/client/src/main/java/org/apache/atlas/ApplicationProperties.java @@ -17,19 +17,17 @@ package org.apache.atlas; -import org.apache.commons.configuration.AbstractConfiguration; +import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.ConfigurationUtils; import org.apache.commons.configuration.PropertiesConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.net.URL; +import java.util.Arrays; import java.util.Iterator; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public class ApplicationProperties extends PropertiesConfiguration { private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class); @@ -47,7 +45,9 @@ public class ApplicationProperties extends PropertiesConfiguration { if (INSTANCE == null) { synchronized (ApplicationProperties.class) { if (INSTANCE == null) { - INSTANCE = get(APPLICATION_PROPERTIES); + Configuration applicationProperties = get(APPLICATION_PROPERTIES); + Configuration clientProperties = get(CLIENT_PROPERTIES); + INSTANCE = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties)); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/client/src/main/resources/client.properties ---------------------------------------------------------------------- diff --git a/client/src/main/resources/client.properties b/client/src/main/resources/client.properties new file mode 100755 index 0000000..722d029 --- /dev/null +++ b/client/src/main/resources/client.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +######### Security Properties ######### + +# SSL config + +atlas.enableTLS=false +#truststore.file=/path/to/truststore.jks +#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks + +#following only required for 2-way SSL +#keystore.file=/path/to/keystore.jks + +# Authentication config + +# enabled: true or false +atlas.http.authentication.enabled=false +# type: simple or kerberos +atlas.http.authentication.type=simple + +######### Security Properties ######### http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/pom.xml ---------------------------------------------------------------------- diff --git a/notification/pom.xml b/notification/pom.xml new file mode 100644 index 0000000..b036855 --- /dev/null +++ b/notification/pom.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>0.6-incubating-SNAPSHOT</version> + </parent> + <artifactId>atlas-notification</artifactId> + <description>Apache Atlas Client</description> + <name>Apache Atlas Notification</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-typesystem</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </dependency> + + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java new file mode 100644 index 0000000..70bb5d6 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.kafka; + +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.message.MessageAndMetadata; +import org.apache.atlas.notification.NotificationConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumer implements NotificationConsumer { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class); + + private final int consumerId; + private final ConsumerIterator iterator; + + public KafkaConsumer(KafkaStream<String, String> stream, int consumerId) { + this.iterator = stream.iterator(); + this.consumerId = consumerId; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public String next() { + MessageAndMetadata message = iterator.next(); + LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}", + consumerId, message.topic(), message.partition(), message.offset(), message.message()); + return (String) message.message(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java new file mode 100644 index 0000000..9978275 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.kafka; + +import com.google.inject.Singleton; +import kafka.consumer.Consumer; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.serializer.StringDecoder; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.Time; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationException; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; + +@Singleton +public class KafkaNotification extends NotificationInterface { + public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); + + public static final String PROPERTY_PREFIX = NotificationInterface.PROPERTY_PREFIX + ".kafka"; + + private static final int ATLAS_ZK_PORT = 9026; + private static final int ATLAS_KAFKA_PORT = 9027; + private static final String ATLAS_KAFKA_DATA = "data"; + + public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK"; + public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; + public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES"; + + private static final String ATLAS_GROUP = "atlas"; + private KafkaServer kafkaServer; + private ServerCnxnFactory factory; + private Properties properties; + + private KafkaProducer producer = null; + private List<ConsumerConnector> consumerConnectors = new ArrayList<>(); + + private KafkaConsumer consumer; + + private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{ + put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); + put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC); + put(NotificationType.TYPES, ATLAS_TYPES_TOPIC); + }}; + + private synchronized void createProducer() { + if (producer == null) { + producer = new KafkaProducer(properties); + } + } + + @Override + public void initialize(Configuration applicationProperties) throws AtlasException { + super.initialize(applicationProperties); + Configuration subsetConfiguration = + ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX); + properties = ConfigurationConverter.getProperties(subsetConfiguration); + //override to store offset in kafka + //todo do we need ability to replay? + + //Override default configs + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + + //todo take group id as argument to allow multiple consumers?? + properties.put(ConsumerConfig.GROUP_ID_CONFIG, ATLAS_GROUP); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); + + if (isEmbedded()) { + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + ATLAS_KAFKA_PORT); + properties.setProperty("zookeeper.connect", "localhost:" + ATLAS_ZK_PORT); + } + + //todo new APIs not available yet +// consumer = new KafkaConsumer(properties); +// consumer.subscribe(ATLAS_HOOK_TOPIC); + } + + @Override + protected void _startService() throws IOException { + startZk(); + startKafka(); + } + + private String startZk() throws IOException { + //todo read zk endpoint from config + this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("0.0.0.0", ATLAS_ZK_PORT), 1024); + File snapshotDir = constructDir("zk/txn"); + File logDir = constructDir("zk/snap"); + + try { + factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500)); + } catch (InterruptedException e) { + throw new IOException(e); + } + return factory.getLocalAddress().getAddress().toString(); + } + + private void startKafka() { + Properties brokerConfig = properties; + brokerConfig.setProperty("broker.id", "1"); + //todo read kafka endpoint from config + brokerConfig.setProperty("host.name", "0.0.0.0"); + brokerConfig.setProperty("port", String.valueOf(ATLAS_KAFKA_PORT)); + brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); + brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); + + kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime()); + kafkaServer.startup(); + LOG.debug("Embedded kafka server started with broker config {}", brokerConfig); + } + + private static class SystemTime implements Time { + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long arg0) { + try { + Thread.sleep(arg0); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private File constructDir(String dirPrefix) { + File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix); + if (!file.exists() && !file.mkdirs()) { + throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); + } + return file; + } + + @Override + public void _shutdown() { + if (producer != null) { + producer.close(); + } + + if (consumer != null) { + consumer.close(); + } + + for (ConsumerConnector consumerConnector : consumerConnectors) { + consumerConnector.shutdown(); + } + + if (kafkaServer != null) { + kafkaServer.shutdown(); + } + + if (factory != null) { + factory.shutdown(); + } + } + + @Override + public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) { + String topic = topicMap.get(type); + + ConsumerConnector consumerConnector = + Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties)); + Map<String, Integer> topicCountMap = new HashMap<>(); + topicCountMap.put(topic, numConsumers); + StringDecoder decoder = new StringDecoder(null); + Map<String, List<KafkaStream<String, String>>> streamsMap = + consumerConnector.createMessageStreams(topicCountMap, decoder, decoder); + List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic); + List<NotificationConsumer> consumers = new ArrayList<>(numConsumers); + int consumerId = 0; + for (KafkaStream stream : kafkaConsumers) { + consumers.add(new org.apache.atlas.kafka.KafkaConsumer(stream, consumerId++)); + } + consumerConnectors.add(consumerConnector); + + return consumers; + } + + @Override + public void send(NotificationType type, String... messages) throws NotificationException { + if (producer == null) { + createProducer(); + } + + String topic = topicMap.get(type); + List<Future<RecordMetadata>> futures = new ArrayList<>(); + for (String message : messages) { + ProducerRecord record = new ProducerRecord(topic, message); + LOG.debug("Sending message for topic {}: {}", topic, message); + futures.add(producer.send(record)); + } + + for (Future<RecordMetadata> future : futures) { + try { + RecordMetadata response = future.get(); + LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), + response.partition(), response.offset()); + } catch (Exception e) { + throw new NotificationException(e); + } + } + } + + //New API, not used now + private List<String> receive(long timeout) throws NotificationException { + Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout); + List<String> messages = new ArrayList<>(); + if (recordsMap != null) { + for (ConsumerRecords records : recordsMap.values()) { + List<ConsumerRecord> recordList = records.records(); + for (ConsumerRecord record : recordList) { + try { + String message = (String) record.value(); + LOG.debug("Received message from topic {}: {}", ATLAS_HOOK_TOPIC, message); + messages.add(message); + } catch (Exception e) { + throw new NotificationException(e); + } + } + } + } + return messages; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java new file mode 100644 index 0000000..c3ac23b --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +public interface NotificationConsumer { + /** + * If there are more messages + * @return + */ + boolean hasNext(); + + /** + * Next message - blocking call + * @return + */ + String next(); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationException.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java new file mode 100644 index 0000000..e6b02fb --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.apache.atlas.AtlasException; + +public class NotificationException extends AtlasException { + public NotificationException(Exception e) { + super(e); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java new file mode 100644 index 0000000..36a62f0 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.inject.Inject; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class NotificationHookConsumer { + private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + + public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; + public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address"; + + @Inject + private static NotificationInterface notificationInterface; + + private static ExecutorService executors; + private static AtlasClient atlasClient; + + public static void start() throws AtlasException { + Configuration applicationProperties = ApplicationProperties.get(); + notificationInterface.initialize(applicationProperties); + + String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); + atlasClient = new AtlasClient(atlasEndpoint); + int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 2); + List<NotificationConsumer> consumers = + notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); + executors = Executors.newFixedThreadPool(consumers.size()); + + for (final NotificationConsumer consumer : consumers) { + executors.submit(new HookConsumer(consumer)); + } + } + + public static void stop() { + notificationInterface.shutdown(); + executors.shutdown(); + } + + static class HookConsumer implements Runnable { + private final NotificationConsumer consumer; + + public HookConsumer(NotificationConsumer consumerInterface) { + this.consumer = consumerInterface; + } + + @Override + public void run() { + while(consumer.hasNext()) { + String entityJson = consumer.next(); + LOG.debug("Processing message {}", entityJson); + try { + atlasClient.createEntity(entityJson); + } catch (AtlasServiceException e) { + //todo handle failures + LOG.warn("Error handling message {}", entityJson); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java new file mode 100644 index 0000000..0951124 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.apache.atlas.AtlasException; +import org.apache.commons.configuration.Configuration; + +import java.io.IOException; +import java.util.List; + +public abstract class NotificationInterface { + public static final String PROPERTY_PREFIX = "atlas.notification"; + private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; + private boolean embedded; + + + public enum NotificationType { + HOOK, ENTITIES, TYPES + } + + /** + * Initialise + * @param applicationProperties + * @throws AtlasException + */ + public void initialize(Configuration applicationProperties) throws AtlasException { + this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); + } + + /** + * Start embedded notification service on atlast server + * @throws IOException + */ + public final void startService() throws IOException { + if (embedded) { + _startService(); + } + } + + /** + * Is the notification service embedded in atlas server + * @return + */ + protected final boolean isEmbedded() { + return embedded; + } + + protected abstract void _startService() throws IOException; + + /** + * Shutdown - close all the connections + */ + public final void shutdown() { + _shutdown(); + } + + protected abstract void _shutdown(); + + public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers); + + public abstract void send(NotificationType type, String... messages) throws NotificationException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java new file mode 100644 index 0000000..db17e35 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.inject.AbstractModule; +import org.apache.atlas.kafka.KafkaNotification; + +public class NotificationModule extends AbstractModule { + @Override + protected void configure() { + bind(NotificationInterface.class).to(KafkaNotification.class).asEagerSingleton(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java new file mode 100644 index 0000000..02752dc --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.kafka; + +import com.google.inject.Inject; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.RandomStringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Guice(modules = NotificationModule.class) +public class KafkaNotificationTest { + + @Inject + private NotificationInterface kafka; + + @BeforeClass + public void setUp() throws Exception { + Configuration conf = ApplicationProperties.get(); + conf.setProperty(KafkaNotification.PROPERTY_PREFIX + ".data", "target/data/kafka" + random()); + kafka.initialize(conf); + kafka.startService(); + } + + @Test + public void testSendMessage() throws AtlasException { + String msg1 = "message" + random(); + String msg2 = "message" + random(); + kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2); + NotificationConsumer consumer = kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + Assert.assertTrue(consumer.hasNext()); + Assert.assertEquals(msg1, consumer.next()); + Assert.assertTrue(consumer.hasNext()); + Assert.assertEquals(msg2, consumer.next()); + } + + private String random() { + return RandomStringUtils.randomAlphanumeric(5); + } + + @AfterClass + public void teardown() throws Exception { + kafka.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 66182e7..facd539 100755 --- a/pom.xml +++ b/pom.xml @@ -329,7 +329,8 @@ <titan.version>0.5.4</titan.version> <hadoop.version>2.7.0</hadoop.version> <hbase.version>0.98.9-hadoop2</hbase.version> - + <kafka.version>0.8.2.0</kafka.version> + <!-- scala versions --> <scala.version>2.10.4</scala.version> <scala.binary.version>2.10</scala.binary.version> @@ -420,6 +421,7 @@ </profiles> <modules> <module>typesystem</module> + <module>notification</module> <module>client</module> <module>repository</module> <module>webapp</module> @@ -933,6 +935,12 @@ <dependency> <groupId>org.apache.atlas</groupId> + <artifactId>atlas-notification</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> <artifactId>atlas-client</artifactId> <version>${project.version}</version> </dependency> @@ -1114,6 +1122,25 @@ <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> + + <!-- kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4e53ad5..a2d2b68 100644 --- a/release-log.txt +++ b/release-log.txt @@ -8,6 +8,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags) ALL CHANGES: +ATLAS-74 Create notification framework (shwethags) ATLAS-93 import-hive.sh reports FileNotFoundException (shwethags) ATLAS-92 import-hive.sh failed to find HiveMetaStoreBridge (airbots via shwethags) ATLAS-16 jersey jaxb exception (shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/repository/src/test/resources/application.properties ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/application.properties b/repository/src/test/resources/application.properties deleted file mode 100755 index d0eaa8c..0000000 --- a/repository/src/test/resources/application.properties +++ /dev/null @@ -1,65 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -######### Graph Database Configs ######### -#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html -# Graph Storage - -atlas.graph.storage.backend=${titan.storage.backend} - -#Berkeley storage directory -atlas.graph.storage.directory=target/data/berkley - -#hbase -#For standalone mode , specify localhost -#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 -atlas.graph.storage.hostname=${titan.storage.hostname} - -# Graph Search Index Backend -atlas.graph.index.search.backend=${titan.index.backend} - -#lucene -#atlas.graph.index.search.directory=target/data/lucene - -#elasticsearch -atlas.graph.index.search.directory=./target/data/es -atlas.graph.index.search.elasticsearch.client-only=false -atlas.graph.index.search.elasticsearch.local-mode=true -atlas.graph.index.search.elasticsearch.create.sleep=2000 - -#solr in cloud mode -atlas.graph.index.search.solr.mode=cloud -atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} - -#solr in http mode -atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr - -######### Hive Lineage Configs ######### -#atlas.lineage.hive.table.type.name=DataSet -#atlas.lineage.hive.process.type.name=Process -#atlas.lineage.hive.process.inputs.name=inputs -#atlas.lineage.hive.process.outputs.name=outputs - -## Schema -atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns - - -######### Security Properties ######### - -# SSL config -atlas.enableTLS=false http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/src/conf/application.properties ---------------------------------------------------------------------- diff --git a/src/conf/application.properties b/src/conf/application.properties index 5487749..bf323a7 100755 --- a/src/conf/application.properties +++ b/src/conf/application.properties @@ -45,6 +45,10 @@ atlas.graph.index.search.elasticsearch.client-only=false atlas.graph.index.search.elasticsearch.local-mode=true atlas.graph.index.search.elasticsearch.create.sleep=2000 +######### Notification Configs ######### +atlas.notification.embedded=true +atlas.notification.kafka.data=${sys:atlas.home}/data/kafka + ######### Hive Lineage Configs ######### # This models reflects the base super types for Data and Process #atlas.lineage.hive.table.type.name=DataSet http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/src/conf/client.properties ---------------------------------------------------------------------- diff --git a/src/conf/client.properties b/src/conf/client.properties index b64755f..ab6ef2f 100755 --- a/src/conf/client.properties +++ b/src/conf/client.properties @@ -21,10 +21,12 @@ # SSL config atlas.enableTLS=false -truststore.file=/path/to/truststore.jks -cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks -# following only required for 2-way SSL -keystore.file=/path/to/keystore.jks + +#truststore.file=/path/to/truststore.jks +#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks + +#following only required for 2-way SSL +#keystore.file=/path/to/keystore.jks # Authentication config http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/typesystem/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties index 29c933f..f7e2774 100644 --- a/typesystem/src/main/resources/application.properties +++ b/typesystem/src/main/resources/application.properties @@ -18,8 +18,47 @@ ######### Graph Database Configs ######### # Graph Storage -atlas.graph.storage.backend=inmemory +atlas.graph.storage.backend=${titan.storage.backend} -# Graph Search Index -atlas.graph.index.search.backend=lucene -atlas.graph.index.search.directory=target/data/lucene +# Graph Search Index Backend +atlas.graph.index.search.backend=${titan.index.backend} + +#Berkeley storage directory +atlas.graph.storage.directory=target/data/berkley + +#hbase +#For standalone mode , specify localhost +#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 +atlas.graph.storage.hostname=${titan.storage.hostname} + +#ElasticSearch +atlas.graph.index.search.directory=target/data/es +atlas.graph.index.search.elasticsearch.client-only=false +atlas.graph.index.search.elasticsearch.local-mode=true +atlas.graph.index.search.elasticsearch.create.sleep=2000 + +# Solr cloud mode properties +atlas.graph.index.search.solr.mode=cloud +atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} + +######### Hive Lineage Configs ######### +# This models reflects the base super types for Data and Process +#atlas.lineage.hive.table.type.name=DataSet +#atlas.lineage.hive.process.type.name=Process +#atlas.lineage.hive.process.inputs.name=inputs +#atlas.lineage.hive.process.outputs.name=outputs + +## Schema +atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns + +######### Notification Configs ######### +atlas.notification.embedded=true +atlas.notification.implementation=org.apache.atlas.kafka.KafkaNotification +atlas.notification.kafka.data=target/data/kafka + +######### Security Properties ######### + +# SSL config +atlas.enableTLS=false + +######### Security Properties ######### http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/typesystem/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/log4j.xml b/typesystem/src/main/resources/log4j.xml index 999caad..528881a 100755 --- a/typesystem/src/main/resources/log4j.xml +++ b/typesystem/src/main/resources/log4j.xml @@ -27,15 +27,6 @@ </layout> </appender> - <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender"> - <param name="File" value="${user.dir}/target/logs/application.log"/> - <param name="Append" value="true"/> - <param name="Threshold" value="debug"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> - </layout> - </appender> - <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender"> <param name="File" value="${user.dir}/target/logs/audit.log"/> <param name="Append" value="true"/> @@ -55,23 +46,8 @@ <appender-ref ref="console"/> </logger> - <logger name="com.thinkaurelius.titan" additivity="false"> - <level value="warn"/> - <appender-ref ref="console"/> - </logger> - - <logger name="org.elasticsearch" additivity="false"> - <level value="warn"/> - <appender-ref ref="console"/> - </logger> - - <logger name="org.apache.lucene" additivity="false"> - <level value="warn"/> - <appender-ref ref="console"/> - </logger> - <root> - <priority value="info"/> + <priority value="warn"/> <appender-ref ref="console"/> </root> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/java/org/apache/atlas/Main.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/Main.java b/webapp/src/main/java/org/apache/atlas/Main.java index b71ca82..7b13f3d 100755 --- a/webapp/src/main/java/org/apache/atlas/Main.java +++ b/webapp/src/main/java/org/apache/atlas/Main.java @@ -41,10 +41,27 @@ public final class Main { private static final String APP_PORT = "port"; private static final String ATLAS_HOME = "atlas.home"; private static final String ATLAS_LOG_DIR = "atlas.log.dir"; - public static final String ATLAS_SERVER_HTTPS_PORT = - "atlas.server.https.port"; - public static final String ATLAS_SERVER_HTTP_PORT = - "atlas.server.http.port"; + public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port"; + public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port"; + + private static EmbeddedServer server; + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + shutdown(); + } catch (Exception e) { + LOG.debug("Failed to shutdown", e); + } + } + }); + } + + private static void shutdown() { + server.stop(); + } /** * Prevent users from constructing this. @@ -84,7 +101,7 @@ public final class Main { configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS)); showStartupInfo(buildConfiguration, enableTLS, appPort); - EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS); + server = EmbeddedServer.newServer(appPort, appPath, enableTLS); server.start(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java index 80df87d..871d857 100755 --- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java @@ -18,14 +18,16 @@ package org.apache.atlas.web.service; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.atlas.ApplicationProperties; +import org.apache.commons.configuration.Configuration; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.webapp.WebAppContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -33,6 +35,8 @@ import java.io.IOException; * This class embeds a Jetty server and a connector. */ public class EmbeddedServer { + public static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class); + private static final int DEFAULT_BUFFER_SIZE = 16192; protected final Server server = new Server(); @@ -71,9 +75,9 @@ public class EmbeddedServer { protected Integer getBufferSize() { try { - PropertiesConfiguration configuration = new PropertiesConfiguration("application.properties"); + Configuration configuration = ApplicationProperties.get(); return configuration.getInt("atlas.jetty.request.buffer.size", DEFAULT_BUFFER_SIZE); - } catch (ConfigurationException e) { + } catch (Exception e) { // do nothing } @@ -85,7 +89,11 @@ public class EmbeddedServer { server.join(); } - public void stop() throws Exception { - server.stop(); + public void stop() { + try { + server.stop(); + } catch (Exception e) { + LOG.warn("Error during shutdown", e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/webapp/src/main/resources/application.properties b/webapp/src/main/resources/application.properties deleted file mode 100755 index ecfdc38..0000000 --- a/webapp/src/main/resources/application.properties +++ /dev/null @@ -1,60 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -######### Graph Database Configs ######### -# Graph Storage -atlas.graph.storage.backend=${titan.storage.backend} - -# Graph Search Index Backend -atlas.graph.index.search.backend=${titan.index.backend} - -#Berkeley storage directory -atlas.graph.storage.directory=target/data/berkley - -#hbase -#For standalone mode , specify localhost -#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 -atlas.graph.storage.hostname=${titan.storage.hostname} - -#ElasticSearch -atlas.graph.index.search.directory=target/data/es -atlas.graph.index.search.elasticsearch.client-only=false -atlas.graph.index.search.elasticsearch.local-mode=true -atlas.graph.index.search.elasticsearch.create.sleep=2000 - -# Solr cloud mode properties -atlas.graph.index.search.solr.mode=cloud -atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} - -######### Hive Lineage Configs ######### -# This models reflects the base super types for Data and Process -#atlas.lineage.hive.table.type.name=DataSet -#atlas.lineage.hive.process.type.name=Process -#atlas.lineage.hive.process.inputs.name=inputs -#atlas.lineage.hive.process.outputs.name=outputs - -## Schema -atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns - - -######### Security Properties ######### - -# SSL config -atlas.enableTLS=false - -######### Security Properties ######### http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml index 5c42c98..7827c1a 100755 --- a/webapp/src/main/resources/log4j.xml +++ b/webapp/src/main/resources/log4j.xml @@ -57,7 +57,7 @@ </logger> <root> - <priority value="info"/> + <priority value="warn"/> <appender-ref ref="FILE"/> </root> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/TestUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/TestUtils.java b/webapp/src/test/java/org/apache/atlas/web/TestUtils.java index ede041e..47bea1f 100644 --- a/webapp/src/test/java/org/apache/atlas/web/TestUtils.java +++ b/webapp/src/test/java/org/apache/atlas/web/TestUtils.java @@ -46,4 +46,9 @@ public class TestUtils { public static String getTempDirectory() { return System.getProperty("projectBaseDir") + "/webapp/target/" + random(); } + + public static String getWarPath() { + return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s", + System.getProperty("project.version")); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java index 614638c..8af4a7e 100644 --- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java @@ -16,9 +16,9 @@ */ package org.apache.atlas.web.security; +import org.apache.atlas.web.TestUtils; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLHostnameVerifier; @@ -35,10 +35,7 @@ import java.nio.file.Files; import java.util.Locale; import java.util.Properties; -import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH; -import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY; -import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; -import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY; +import static org.apache.atlas.security.SecurityProperties.*; /** * @@ -110,8 +107,7 @@ public class BaseSecurityTest { } protected String getWarPath() { - return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s", - System.getProperty("project.version")); + return TestUtils.getWarPath(); } protected PropertiesConfiguration getSSLConfiguration(String providerUrl) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java index 7eb36d8..e1f9b54 100644 --- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java @@ -17,6 +17,7 @@ package org.apache.atlas.web.service; +import org.apache.atlas.web.TestUtils; import org.apache.commons.configuration.PropertiesConfiguration; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,9 +38,7 @@ public class SecureEmbeddedServerIT extends SecureEmbeddedServerITBase { SecureEmbeddedServer secureEmbeddedServer = null; try { - String appPath = System.getProperty("user.dir") + getWarPath(); - - secureEmbeddedServer = new SecureEmbeddedServer(21443, appPath) { + secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) { @Override protected PropertiesConfiguration getConfiguration() { return configuration; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java index 9a5b8ad..f7c3625 100755 --- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java +++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java @@ -19,6 +19,7 @@ package org.apache.atlas.web.service; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; +import org.apache.atlas.web.TestUtils; import org.apache.atlas.web.resources.AdminJerseyResourceIT; import org.apache.atlas.web.resources.BaseResourceIT; import org.apache.atlas.web.resources.EntityJerseyResourceIT; @@ -31,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.hadoop.security.alias.JavaKeyStoreProvider; -import org.eclipse.jetty.webapp.WebAppContext; import org.testng.Assert; import org.testng.TestListenerAdapter; import org.testng.TestNG; @@ -45,11 +45,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; -import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH; -import static org.apache.atlas.security.SecurityProperties.DEFAULT_KEYSTORE_FILE_LOCATION; -import static org.apache.atlas.security.SecurityProperties.KEYSTORE_PASSWORD_KEY; -import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_KEY; -import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY; +import static org.apache.atlas.security.SecurityProperties.*; /** * Secure Test class for jersey resources. @@ -106,18 +102,13 @@ public class SecureEmbeddedServerITBase { public void testNoConfiguredCredentialProvider() throws Exception { try { - secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas"); - WebAppContext webapp = new WebAppContext(); - webapp.setContextPath("/"); - webapp.setWar(System.getProperty("user.dir") + getWarPath()); - secureEmbeddedServer.server.setHandler(webapp); - + secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()); secureEmbeddedServer.server.start(); Assert.fail("Should have thrown an exception"); } catch (IOException e) { - Assert.assertEquals("No credential provider path configured for storage of certificate store passwords", - e.getMessage()); + Assert.assertEquals(e.getMessage(), + "No credential provider path configured for storage of certificate store passwords"); } finally { secureEmbeddedServer.server.stop(); } @@ -130,7 +121,7 @@ public class SecureEmbeddedServerITBase { configuration.setProperty(CERT_STORES_CREDENTIAL_PROVIDER_PATH, providerUrl); try { - secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") { + secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) { @Override protected PropertiesConfiguration getConfiguration() { return configuration; @@ -157,17 +148,12 @@ public class SecureEmbeddedServerITBase { setupCredentials(); try { - secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") { + secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) { @Override protected PropertiesConfiguration getConfiguration() { return configuration; } }; - WebAppContext webapp = new WebAppContext(); - webapp.setContextPath("/"); - webapp.setWar(System.getProperty("user.dir") + getWarPath()); - secureEmbeddedServer.server.setHandler(webapp); - secureEmbeddedServer.server.start(); TestListenerAdapter tla = new TestListenerAdapter(); @@ -184,11 +170,6 @@ public class SecureEmbeddedServerITBase { } - protected String getWarPath() { - return String - .format("/target/atlas-webapp-%s", System.getProperty("project.version")); - } - protected void setupCredentials() throws Exception { Configuration conf = new Configuration(false);
