http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java new file mode 100755 index 0000000..7a180d2 --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java @@ -0,0 +1,100 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; +import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest; + +public class ParallelServiceTest extends ODFTestcase { + private static final int NUMBER_OF_QUEUED_REQUESTS = 1; + Logger log = ODFTestLogger.get(); + + @Test + public void runDataSetsInParallelSuccess() throws Exception { + runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "successID2" }), State.FINISHED, State.FINISHED); + } + + private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, State... expectedState) throws Exception { + log.info("Running data sets in parallel: " + dataSetIDs); + log.info("Expected state: " + expectedState); + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + + List<AnalysisRequest> requests = new ArrayList<AnalysisRequest>(); + List<AnalysisResponse> responses = new ArrayList<AnalysisResponse>(); + List<String> idList = new ArrayList<String>(); + + for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) { + for (String dataSet : dataSetIDs) { + final AnalysisRequest req = ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + UUID.randomUUID().toString())); + AnalysisResponse resp = analysisManager.runAnalysis(req); + req.setId(resp.getId()); + requests.add(req); + idList.add(resp.getId()); + responses.add(resp); + } + } + log.info("Parallel requests started: " + idList.toString()); + + Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), requests.size()); + Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), responses.size()); + + // check that requests are processed in parallel: + // there must be a point in time where both requests are in status "active" + log.info("Polling for status of parallel request..."); + boolean foundPointInTimeWhereBothRequestsAreActive = false; + int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS; + List<State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>(); + do { + int foundActive = 0; + allSingleStates.clear(); + for (AnalysisRequest request : requests) { + final State state = analysisManager.getAnalysisRequestStatus(request.getId()).getState(); + if (state == State.ACTIVE) { + log.info("ACTIVE: " + request.getId() + " foundactive: " + foundActive); + foundActive++; + } else { + log.info("NOT ACTIVE " + request.getId() + " _ " + state); + } + allSingleStates.add(state); + } + if (foundActive > 1) { + foundPointInTimeWhereBothRequestsAreActive = true; + } + + maxPolls--; + Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING); + } while (maxPolls > 0 && Utils.containsNone(allSingleStates, new State[] { State.ACTIVE, State.QUEUED })); + + Assert.assertTrue(maxPolls > 0); + Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive); + Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState))); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java new file mode 100755 index 0000000..5e3d97e --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java @@ -0,0 +1,49 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.core.test.TestEnvironmentInitializer; + +public class TestEnvironmentMessagingInitializer implements TestEnvironmentInitializer { + + public TestEnvironmentMessagingInitializer() { + } + + public void start() { + Logger logger = Logger.getLogger(TestEnvironmentMessagingInitializer.class.getName()); + try { + logger.info("Starting Test-Kafka during initialization..."); + TestKafkaStarter starter = new TestKafkaStarter(); + starter.startKafka(); + logger.info("Test-Kafka initialized"); + } catch (Exception exc) { + logger.log(Level.INFO, "Exception occurred while starting test kafka", exc); + throw new RuntimeException(exc); + } + } + + @Override + public void stop() { + // TODO Auto-generated method stub + + } + + @Override + public String getName() { + return "Kafka1001"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java new file mode 100755 index 0000000..1c3025e --- /dev/null +++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java @@ -0,0 +1,306 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging.kafka; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.BindException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.rmi.NotBoundException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.wink.json4j.JSONObject; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; + +import org.apache.atlas.odf.core.Utils; + +import kafka.cluster.Broker; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +public class TestKafkaStarter { + + public static boolean deleteRecursive(File path) throws FileNotFoundException { + if (!path.exists()) { + throw new FileNotFoundException(path.getAbsolutePath()); + } + boolean ret = true; + if (path.isDirectory()) { + for (File f : path.listFiles()) { + ret = ret && deleteRecursive(f); + } + } + return ret && path.delete(); + } + + static Thread zookeeperThread = null; + static boolean kafkaStarted = false; + static Object lockObject = new Object(); + static KafkaServerStartable kafkaServer = null; + static ZooKeeperServerMainWithShutdown zooKeeperServer = null; + + + boolean cleanData = true; // all data is cleaned at server start !! + + public boolean isCleanData() { + return cleanData; + } + + public void setCleanData(boolean cleanData) { + this.cleanData = cleanData; + } + + Logger logger = Logger.getLogger(TestKafkaStarter.class.getName()); + + void log(String s) { + logger.info(s); + } + + int zookeeperStartupTime = 10000; + int kafkaStartupTime = 10000; + + static class ZooKeeperServerMainWithShutdown extends ZooKeeperServerMain { + public void shutdown() { + super.shutdown(); + } + } + + private void startZookeeper() throws Exception { + log("Starting zookeeper"); + + final Properties zkProps = Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties"); + final String zkPort = (String) zkProps.get("clientPort"); + if (zooKeeperServer == null) { + log("zookeeper properties: " + zkProps); + if (cleanData) { + String dataDir = zkProps.getProperty("dataDir"); + log("Removing all data from zookeeper data dir " + dataDir); + File dir = new File(dataDir); + if (dir.exists()) { + if (!deleteRecursive(dir)) { + throw new IOException("Could not delete directory " + dataDir); + } + } + } + final ZooKeeperServerMainWithShutdown zk = new ZooKeeperServerMainWithShutdown(); + final ServerConfig serverConfig = new ServerConfig(); + log("Loading zookeeper config..."); + QuorumPeerConfig zkConfig = new QuorumPeerConfig(); + zkConfig.parseProperties(zkProps); + serverConfig.readFrom(zkConfig); + + Runnable zookeeperStarter = new Runnable() { + + @Override + public void run() { + try { + log("Now starting Zookeeper with API..."); + zk.runFromConfig(serverConfig); + } catch (BindException ex) { + log("Embedded zookeeper could not be started, port is already in use. Trying to use external zookeeper"); + ZooKeeper zk = null; + try { + zk = new ZooKeeper("localhost:" + zkPort, 5000, null); + if (zk.getState().equals(States.CONNECTED)) { + log("Using existing zookeeper running on port " + zkPort); + return; + } else { + throw new NotBoundException(); + } + } catch (Exception zkEx) { + throw new RuntimeException("Could not connect to zookeeper on port " + zkPort + ". Please close all applications listening on this port."); + } finally { + if (zk != null) { + try { + zk.close(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "An error occured closing the zk connection", e); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + } + }; + + zookeeperThread = new Thread(zookeeperStarter); + zookeeperThread.setDaemon(true); + zookeeperThread.start(); + log("Zookeeper start initiated"); + zooKeeperServer = zk; + } + ZkConnection conn = new ZkConnection("localhost:" + zkPort); + final CountDownLatch latch = new CountDownLatch(1); + conn.connect(new Watcher() { + + @Override + public void process(WatchedEvent event) { + log("Zookeeper event: " + event.getState()); + if (event.getState().equals(KeeperState.SyncConnected)) { + log("Zookeeper server up and running"); + latch.countDown(); + } + } + }); + + boolean zkReady = latch.await(zookeeperStartupTime, TimeUnit.MILLISECONDS); + if (zkReady) { + log("Zookeeper initialized and started"); + + } else { + logger.severe("Zookeeper could not be initialized within " + (zookeeperStartupTime / 1000) + " sec"); + } + conn.close(); + } + + public boolean isRunning() { + return kafkaStarted; + } + + public void startKafka() throws Exception { + synchronized (lockObject) { + if (kafkaStarted) { + log("Kafka already running"); + return; + } + this.startZookeeper(); + + log("Starting Kafka server..."); + Properties kafkaProps = Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties"); + log("Kafka properties: " + kafkaProps); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProps); + int kafkaPort = kafkaConfig.port(); + if (cleanData && isPortAvailable(kafkaPort)) { + String logDir = kafkaProps.getProperty("log.dirs"); + log("Removing all data from kafka log dir: " + logDir); + File dir = new File(logDir); + if (dir.exists()) { + if (!deleteRecursive(dir)) { + throw new IOException("Kafka logDir could not be deleted: " + logDir); + } + } + } + if (!isPortAvailable(kafkaPort)) { + log("Kafka port " + kafkaPort + " is already in use. " + + "Checking if zookeeper has a registered broker on this port to make sure it is an existing kafka instance using the port."); + ZooKeeper zk = new ZooKeeper(kafkaConfig.zkConnect(), 10000, null); + try { + List<String> ids = zk.getChildren("/brokers/ids", false); + if (ids != null && !ids.isEmpty()) { + for (String id : ids) { + String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null), "UTF-8"); + JSONObject broker = new JSONObject(brokerInfo); + Integer port = new Integer(String.valueOf(broker.get("port"))); + if (port != null && port.equals(kafkaPort)) { + log("Using externally started kafka broker on port " + port); + kafkaStarted = true; + return; + } + } + } + } catch (NoNodeException ex) { + log("No brokers registered with zookeeper!"); + throw new RuntimeException("Kafka broker port " + kafkaPort + + " not available and no broker found! Please close all running applications listening on this port"); + } finally { + if (zk != null) { + try { + zk.close(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "An error occured closing the zk connection", e); + } + } + } + } + KafkaServerStartable kafka = KafkaServerStartable.fromProps(kafkaProps); + kafka.startup(); + log("Kafka server start initiated"); + + kafkaServer = kafka; + log("Give Kafka a maximum of " + kafkaStartupTime + " ms to start"); + ZkClient zk = new ZkClient(kafkaConfig.zkConnect(), 10000, 5000, ZKStringSerializer$.MODULE$); + int maxRetryCount = kafkaStartupTime / 1000; + int cnt = 0; + while (cnt < maxRetryCount) { + cnt++; + Seq<Broker> allBrokersInCluster = new ZkUtils(zk, new ZkConnection(kafkaConfig.zkConnect()), false).getAllBrokersInCluster(); + List<Broker> brokers = JavaConversions.seqAsJavaList(allBrokersInCluster); + for (Broker broker : brokers) { + if (broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port() == kafkaPort) { + log("Broker is registered, Kafka is available after " + cnt + " seconds"); + kafkaStarted = true; + return; + } + } + Thread.sleep(1000); + } + logger.severe("Kafka broker was not started after " + kafkaStartupTime + " ms"); + } + } + + public void shutdownKafka() { + // do nothing for shutdown + } + + boolean isPortAvailable(int port) { + ServerSocket ss = null; + DatagramSocket ds = null; + try { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + ds = new DatagramSocket(port); + ds.setReuseAddress(true); + return true; + } catch (IOException e) { + } finally { + if (ds != null) { + ds.close(); + } + + if (ss != null) { + try { + ss.close(); + } catch (IOException e) { + } + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties new file mode 100755 index 0000000..4769c95 --- /dev/null +++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties @@ -0,0 +1,136 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +listeners=PLAINTEXT://:9092 + +# The port the socket server listens on +# port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=/tmp/odf-embedded-test-kafka/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=24 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeperConnectionTimeoutMs=6000 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties new file mode 100755 index 0000000..7234e9c --- /dev/null +++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties @@ -0,0 +1,34 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/odf-embedded-test-kafka/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties ---------------------------------------------------------------------- diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties new file mode 100755 index 0000000..5611c29 --- /dev/null +++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties @@ -0,0 +1,18 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +## USE for TESTs only + +ODFConfigurationStorage=MockConfigurationStorage +SparkServiceExecutor=MockSparkServiceExecutor +NotificationManager=TestNotificationManager http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/.gitignore ---------------------------------------------------------------------- diff --git a/odf/odf-spark-example-application/.gitignore b/odf/odf-spark-example-application/.gitignore new file mode 100755 index 0000000..d523581 --- /dev/null +++ b/odf/odf-spark-example-application/.gitignore @@ -0,0 +1,20 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +.settings +target +.classpath +.project +.factorypath +.DS_Store +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/pom.xml ---------------------------------------------------------------------- diff --git a/odf/odf-spark-example-application/pom.xml b/odf/odf-spark-example-application/pom.xml new file mode 100755 index 0000000..a2baa9e --- /dev/null +++ b/odf/odf-spark-example-application/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0"?> +<!-- +~ +~ Licensed under the Apache License, Version 2.0 (the "License"); +~ you may not use this file except in compliance with the License. +~ You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.apache.atlas.odf</groupId> + <artifactId>odf</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>odf-spark-example-application</artifactId> + <packaging>jar</packaging> + <name>odf-spark-example-application</name> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + </plugin> + </plugins> + + </build> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>2.1.0</version> + <scope>provided</scope> + </dependency> + <dependency> <!-- Spark dependency --> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>2.1.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-api</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java ---------------------------------------------------------------------- diff --git a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java new file mode 100755 index 0000000..f5f7b70 --- /dev/null +++ b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.spark; + +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.spark.SparkDiscoveryServiceBase; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import org.apache.atlas.odf.api.spark.SparkDiscoveryService; +import org.apache.atlas.odf.api.spark.SparkUtils; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; + +public class SparkDiscoveryServiceExample extends SparkDiscoveryServiceBase implements SparkDiscoveryService { + static Logger logger = Logger.getLogger(SparkDiscoveryServiceExample.class.getName()); + + @Override + public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) { + logger.log(Level.INFO, "Checking data set access."); + DataSetCheckResult checkResult = new DataSetCheckResult(); + checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible); + Dataset<Row> df = SparkUtils.createDataFrame(this.spark, dataSetContainer, this.mds); + // Print first rows to check whether data frame can be accessed + df.show(10); + return checkResult; + } + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + logger.log(Level.INFO, "Starting discovery service."); + Dataset<Row> df = SparkUtils.createDataFrame(spark, request.getDataSetContainer(), this.mds); + Map<String,Dataset<Row>> annotationDataFrameMap = SummaryStatistics.processDataFrame(this.spark, df, null); + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + response.setCode(ResponseCode.OK); + response.setDetails("Discovery service successfully completed."); + response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(request.getDataSetContainer(), annotationDataFrameMap, this.mds)); + return response; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java ---------------------------------------------------------------------- diff --git a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java new file mode 100755 index 0000000..a7d1542 --- /dev/null +++ b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.spark; + +import org.apache.atlas.odf.api.spark.SparkUtils; +import org.apache.spark.SparkFiles; + +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public class SummaryStatistics { + static Logger logger = Logger.getLogger(SummaryStatistics.class.getName()); + private static final String CSV_FILE_PARAMETER = "-dataFile="; + // The following constant is defined in class DiscoveryServiceSparkEndpoint but is duplicated here to avoid dependencies to the ODF code: + private static final String ANNOTATION_PROPERTY_COLUMN_NAME = "ODF_ANNOTATED_COLUMN"; + + // The main method is only available for testing purposes and is not called by ODF + public static void main(String[] args) { + logger.log(Level.INFO, "Running spark launcher with arguments: " + args[0]); + if ((args[0] == null) || (!args[0].startsWith(CSV_FILE_PARAMETER))) { + System.out.println(MessageFormat.format("Error: Spark Application Parameter '{0}' is missing.", CSV_FILE_PARAMETER)); + System.exit(1); + } + String dataFilePath = SparkFiles.get(args[0].replace(CSV_FILE_PARAMETER, "")); + logger.log(Level.INFO, "Data file path is " + dataFilePath); + + // Create Spark session + SparkSession spark = SparkSession.builder().master("local").appName("ODF Spark example application").getOrCreate(); + + // Read CSV file into data frame + Dataset<Row> df = spark.read() + .format("com.databricks.spark.csv") + .option("inferSchema", "true") + .option("header", "true") + .load(dataFilePath); + + // Run actual job and print result + Map<String, Dataset<Row>> annotationDataFrameMap = null; + try { + annotationDataFrameMap = processDataFrame(spark, df, args); + } catch (Exception e) { + logger.log(Level.INFO, MessageFormat.format("An error occurred while processing data set {0}:", args[0]), e); + } finally { + // Close and stop spark context + spark.close(); + spark.stop(); + } + if (annotationDataFrameMap == null) { + System.exit(1); + } else { + // Print all annotationDataFrames for all annotation types to stdout + for (Map.Entry<String, Dataset<Row>> entry : annotationDataFrameMap.entrySet()) { + logger.log(Level.INFO, "Result data frame for annotation type " + entry.getKey() + ":"); + entry.getValue().show(); + } + } + } + + // The following method contains the actual implementation of the ODF Spark discovery service + public static Map<String,Dataset<Row>> processDataFrame(SparkSession spark, Dataset<Row> df, String[] args) { + logger.log(Level.INFO, "Started summary statistics Spark application."); + Map<String, Dataset<Row>> resultMap = new HashMap<String, Dataset<Row>>(); + + // Print input data set + df.show(); + + // Create column annotation data frame that contains basic data frame statistics + Dataset<Row> dfStatistics = df.describe(); + + // Rename "summary" column to ANNOTATION_PROPERTY_COLUMN_NAME + String[] columnNames = dfStatistics.columns(); + columnNames[0] = ANNOTATION_PROPERTY_COLUMN_NAME; + Dataset<Row> summaryStatistics = dfStatistics.toDF(columnNames); + summaryStatistics.show(); + String columnAnnotationTypeName = "SparkSummaryStatisticsAnnotation"; + + // Transpose table to turn it into format required by ODF + Dataset<Row> columnAnnotationDataFrame = SparkUtils.transposeDataFrame(spark, summaryStatistics); + columnAnnotationDataFrame.show(); + + // Create table annotation that contains the data frame's column count + String tableAnnotationTypeName = "SparkTableAnnotation"; + Dataset<Row> tableAnnotationDataFrame = columnAnnotationDataFrame.select(new Column("count")).limit(1); + tableAnnotationDataFrame.show(); + + // Add annotation data frames to result map + resultMap.put(columnAnnotationTypeName, columnAnnotationDataFrame); + resultMap.put(tableAnnotationTypeName, tableAnnotationDataFrame); + + logger.log(Level.INFO, "Spark job finished."); + return resultMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/.gitignore ---------------------------------------------------------------------- diff --git a/odf/odf-spark/.gitignore b/odf/odf-spark/.gitignore new file mode 100755 index 0000000..cde346c --- /dev/null +++ b/odf/odf-spark/.gitignore @@ -0,0 +1,19 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +.settings +target +.classpath +.project +.factorypath +.DS_Store http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/pom.xml ---------------------------------------------------------------------- diff --git a/odf/odf-spark/pom.xml b/odf/odf-spark/pom.xml new file mode 100755 index 0000000..378f280 --- /dev/null +++ b/odf/odf-spark/pom.xml @@ -0,0 +1,242 @@ +<?xml version="1.0"?> +<!-- +~ +~ Licensed under the Apache License, Version 2.0 (the "License"); +~ you may not use this file except in compliance with the License. +~ You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>odf-spark</artifactId> + <packaging>jar</packaging> + <name>odf-spark</name> + <dependencies> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-api</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-core</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-core</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-messaging</artifactId> + <version>1.2.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-messaging</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-store</artifactId> + <version>1.2.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <!-- Workaround: Add odf-spark-example-application because dynamic jar load does not seem to work on IBM JDK --> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-spark-example-application</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-launcher_2.11</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> + <!-- The following Spark dependencies are needed for testing only. --> + <!-- Nevertheless, they have to be added as compile dependencies in order to become available to the SDPFactory. --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>2.1.0</version> + <exclusions> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>2.1.0</version> + <exclusions> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <build> + <resources> + <resource> + <directory>${project.build.directory}/downloads</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19</version> + <configuration> + <systemPropertyVariables> + <odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect> + <odf.logspec>${odf.unittest.logspec}</odf.logspec> + <odf.build.project.name>${project.name}</odf.build.project.name> + </systemPropertyVariables> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <id>download-jar-file</id> + <phase>validate</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-api</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>jar</type> + <overWrite>true</overWrite> + <outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-spark-example-application</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>jar</type> + <overWrite>true</overWrite> + <outputDirectory>/tmp/odf-spark</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-spark-example-application</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>jar</type> + <overWrite>true</overWrite> + <outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.wink</groupId> + <artifactId>wink-json4j</artifactId> + <version>1.4</version> + <type>jar</type> + <overWrite>true</overWrite> + <outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory> + </artifactItem> + </artifactItems> + <includes>**/*</includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>integration-tests</id> + <activation> + <property> + <name>reduced-tests</name> + <value>!true</value> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.19</version> + <configuration> + <systemPropertyVariables> + <odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect> + <odf.logspec>${odf.integrationtest.logspec}</odf.logspec> + </systemPropertyVariables> + <dependenciesToScan> + <dependency>org.apache.atlas.odf:odf-core</dependency> + </dependenciesToScan> + <includes> + <include>**/integrationtest/**/SparkDiscoveryServiceLocalTest.java</include> + </includes> + </configuration> + <executions> + <execution> + <id>integration-test</id> + <goals> + <goal>integration-test</goal> + </goals> + </execution> + <execution> + <id>verify</id> + <goals> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java ---------------------------------------------------------------------- diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java new file mode 100755 index 0000000..84ae80c --- /dev/null +++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java @@ -0,0 +1,154 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.spark; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.Constructor; +import java.text.MessageFormat; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.metadata.models.MetaDataObject; +import org.apache.atlas.odf.api.metadata.models.RelationalDataSet; +import org.apache.atlas.odf.api.spark.SparkDiscoveryService; +import org.apache.atlas.odf.api.spark.SparkServiceExecutor; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint.SERVICE_INTERFACE_TYPE; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.spark.SparkUtils; +import org.apache.atlas.odf.json.JSONUtils; + +/** + * This class calls the actual Spark discovery services depending on the type of interface they implement. + * The class is used to run a Spark discovery service either on a local Spark cluster ({@link SparkServiceExecutorImpl}) + * or on a remote Spark cluster ({@link SparkApplicationStub}). + * + * + */ + +public class LocalSparkServiceExecutor implements SparkServiceExecutor { + private Logger logger = Logger.getLogger(LocalSparkServiceExecutor.class.getName()); + private SparkSession spark; + private MetadataStore mds; + + void setSparkSession(SparkSession spark) { + this.spark = spark; + } + + void setMetadataStore(MetadataStore mds) { + this.mds = mds; + } + + @Override + public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsProp, DataSetContainer container) { + DiscoveryServiceSparkEndpoint endpoint; + try { + endpoint = JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class); + } catch (JSONException e1) { + throw new RuntimeException(e1); + } + DataSetCheckResult checkResult = new DataSetCheckResult(); + try { + SERVICE_INTERFACE_TYPE inputMethod = endpoint.getInputMethod(); + if (inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) { + MetaDataObject dataSet = container.getDataSet(); + if (!(dataSet instanceof RelationalDataSet)) { + checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible); + checkResult.setDetails("This service can only process relational data sets."); + } else { + checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible); + Dataset<Row> df = SparkUtils.createDataFrame(this.spark, container, this.mds); + // Print first rows to check whether data frame can be accessed + df.show(10); + } + } else if (inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) { + Class<?> clazz = Class.forName(endpoint.getClassName()); + Constructor<?> cons = clazz.getConstructor(); + SparkDiscoveryService service = (SparkDiscoveryService) cons.newInstance(); + service.setMetadataStore(this.mds); + service.setSparkSession(this.spark); + checkResult = service.checkDataSet(container); + } + } catch (Exception e) { + logger.log(Level.WARNING,"Access to data set not possible.", e); + checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible); + checkResult.setDetails(getExceptionAsString(e)); + } finally { + this.spark.close(); + this.spark.stop(); + } + return checkResult; + } + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceProperties dsProp, DiscoveryServiceRequest request) { + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + response.setDetails("Annotations created successfully"); + response.setCode(DiscoveryServiceResponse.ResponseCode.OK); + try { + DiscoveryServiceSparkEndpoint endpoint = JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class); + Class<?> clazz = Class.forName(endpoint.getClassName()); + DataSetContainer container = request.getDataSetContainer(); + String[] optionalArgs = {}; // For future use + SERVICE_INTERFACE_TYPE inputMethod = endpoint.getInputMethod(); + + if (inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) { + if (!(container.getDataSet() instanceof RelationalDataSet)) { + throw new RuntimeException("This service can only process relational data sets (DataFile or Table)."); + } + Dataset<Row> df = SparkUtils.createDataFrame(this.spark, container, this.mds); + @SuppressWarnings("unchecked") + Map<String, Dataset<Row>> annotationDataFrameMap = (Map<String, Dataset<Row>>) clazz.getMethod("processDataFrame", SparkSession.class, Dataset.class, String[].class).invoke(null, this.spark, df, (Object[]) optionalArgs); + response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(container, annotationDataFrameMap, this.mds)); + } else if (inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) { + Constructor<?> cons = clazz.getConstructor(); + SparkDiscoveryService service = (SparkDiscoveryService) cons.newInstance(); + service.setMetadataStore(this.mds); + service.setSparkSession(this.spark); + response = service.runAnalysis(request); + } else { + throw new RuntimeException(MessageFormat.format("Unsupported interface type {0}.", inputMethod)); + } + } catch(Exception e) { + logger.log(Level.WARNING,"Error running discovery service.", e); + response.setDetails(getExceptionAsString(e)); + response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR); + } finally { + this.spark.close(); + this.spark.stop(); + } + return response; + } + + public static String getExceptionAsString(Throwable exc) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + exc.printStackTrace(pw); + String st = sw.toString(); + return st; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java ---------------------------------------------------------------------- diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java new file mode 100755 index 0000000..81fea2c --- /dev/null +++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java @@ -0,0 +1,107 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.spark; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.text.MessageFormat; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; + +import org.apache.atlas.odf.core.Utils; + +public class SparkJars { + private static Logger logger = Logger.getLogger(SparkJars.class.getName()); + + public String getResourceAsJarFile(String resource) { + ClassLoader cl = this.getClass().getClassLoader(); + InputStream inputStream = cl.getResourceAsStream(resource); + if (inputStream == null) { + String msg = MessageFormat.format("Resource {0} was not found.", resource); + logger.log(Level.WARNING, msg); + throw new RuntimeException(msg); + } + String tempFilePath = null; + try { + File tempFile = File.createTempFile("driver", "jar"); + tempFilePath = tempFile.getAbsolutePath(); + logger.log(Level.INFO, "Creating temporary file " + tempFilePath); + IOUtils.copy(inputStream, new FileOutputStream(tempFile)); + inputStream.close(); + Utils.runSystemCommand("chmod 755 " + tempFilePath); + } catch (IOException e) { + String msg = MessageFormat.format("Error creating temporary file from resource {0}: ", resource); + logger.log(Level.WARNING, msg, e); + throw new RuntimeException(msg + Utils.getExceptionAsString(e)); + } + return tempFilePath; + } + + public String getUrlasJarFile(String urlString) { + try { + File tempFile = File.createTempFile("driver", "jar"); + logger.log(Level.INFO, "Creating temporary file " + tempFile); + FileUtils.copyURLToFile(new URL(urlString), tempFile); + Utils.runSystemCommand("chmod 755 " + tempFile.getAbsolutePath()); + return tempFile.getAbsolutePath(); + } catch (MalformedURLException e) { + String msg = MessageFormat.format("An invalid Spark application URL {0} was provided: ", urlString); + logger.log(Level.WARNING, msg, e); + throw new RuntimeException(msg + Utils.getExceptionAsString(e)); + } catch (IOException e) { + logger.log(Level.WARNING, "Error processing Spark application jar file.", e); + throw new RuntimeException("Error processing Spark application jar file: " + Utils.getExceptionAsString(e)); + } + } + + public byte[] getFileAsByteArray(String resourceOrURL) { + try { + InputStream inputStream; + if (isValidUrl(resourceOrURL)) { + inputStream = new URL(resourceOrURL).openStream(); + } else { + ClassLoader cl = this.getClass().getClassLoader(); + inputStream = cl.getResourceAsStream(resourceOrURL); + if (inputStream == null) { + String msg = MessageFormat.format("Resource {0} was not found.", resourceOrURL); + logger.log(Level.WARNING, msg); + throw new RuntimeException(msg); + } + } + byte[] bytes = IOUtils.toByteArray(inputStream); + return bytes; + } catch (IOException e) { + String msg = MessageFormat.format("Error converting jar file {0} into byte array: ", resourceOrURL); + logger.log(Level.WARNING, msg, e); + throw new RuntimeException(msg + Utils.getExceptionAsString(e)); + } + } + + public static boolean isValidUrl(String urlString) { + try { + new URL(urlString); + return true; + } catch (java.net.MalformedURLException exc) { + // Expected exception if URL is not valid + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java ---------------------------------------------------------------------- diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java new file mode 100755 index 0000000..720343b --- /dev/null +++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java @@ -0,0 +1,102 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.spark; + +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.text.MessageFormat; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.api.spark.SparkServiceExecutor; +import org.apache.spark.sql.SparkSession; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.settings.SparkConfig; +import org.apache.atlas.odf.json.JSONUtils; + +/** + * Calls the appropriate implementation (local vs. remote) of the @link SparkServiceExecutor depending on the current @SparkConfig. + * Prepares the local Spark cluster to be used in unit and integration tests. + * + * + */ + +public class SparkServiceExecutorImpl implements SparkServiceExecutor { + private Logger logger = Logger.getLogger(SparkServiceExecutorImpl.class.getName()); + + @Override + public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsri, DataSetContainer dataSetContainer) { + return this.getExecutor(dsri).checkDataSet(dsri, dataSetContainer); + }; + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceProperties dsri, DiscoveryServiceRequest request) { + return this.getExecutor(dsri).runAnalysis(dsri, request); + } + + private SparkServiceExecutor getExecutor(DiscoveryServiceProperties dsri) { + SettingsManager config = new ODFFactory().create().getSettingsManager(); + DiscoveryServiceSparkEndpoint endpoint; + try { + endpoint = JSONUtils.convert(dsri.getEndpoint(), DiscoveryServiceSparkEndpoint.class); + } catch (JSONException e1) { + throw new RuntimeException(e1); + } + + SparkConfig sparkConfig = config.getODFSettings().getSparkConfig(); + if (sparkConfig == null) { + String msg = "No Spark service is configured. Please manually register Spark service or bind a Spark service to your ODF Bluemix app."; + logger.log(Level.SEVERE, msg); + throw new RuntimeException(msg); + } else { + logger.log(Level.INFO, "Using local Spark cluster {0}.", sparkConfig.getClusterMasterUrl()); + SparkSession spark = SparkSession.builder().master(sparkConfig.getClusterMasterUrl()).appName(dsri.getName()).getOrCreate(); + SparkJars sparkJars = new SparkJars(); + try { + // Load jar file containing the Spark job to be started + URLClassLoader classLoader = (URLClassLoader)ClassLoader.getSystemClassLoader(); + Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + method.setAccessible(true); + String applicationJarFile; + if (SparkJars.isValidUrl(endpoint.getJar())) { + applicationJarFile = sparkJars.getUrlasJarFile(endpoint.getJar()); + } else { + applicationJarFile = sparkJars.getResourceAsJarFile(endpoint.getJar()); + } + logger.log(Level.INFO, "Using application jar file {0}.", applicationJarFile); + method.invoke(classLoader, new URL("file:" + applicationJarFile)); + } catch (Exception e) { + String msg = MessageFormat.format("Error loading jar file {0} implementing the Spark discovery service: ", endpoint.getJar()); + logger.log(Level.WARNING, msg, e); + spark.close(); + spark.stop(); + throw new RuntimeException(msg, e); + } + LocalSparkServiceExecutor executor = new LocalSparkServiceExecutor(); + executor.setSparkSession(spark); + executor.setMetadataStore(new ODFFactory().create().getMetadataStore()); + return executor; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties ---------------------------------------------------------------------- diff --git a/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties new file mode 100755 index 0000000..d6651ee --- /dev/null +++ b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +SparkServiceExecutor=org.apache.atlas.odf.core.spark.SparkServiceExecutorImpl http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/.gitignore ---------------------------------------------------------------------- diff --git a/odf/odf-store/.gitignore b/odf/odf-store/.gitignore new file mode 100755 index 0000000..ea5ddb8 --- /dev/null +++ b/odf/odf-store/.gitignore @@ -0,0 +1,5 @@ +.settings +target +.classpath +.project +.factorypath \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/pom.xml ---------------------------------------------------------------------- diff --git a/odf/odf-store/pom.xml b/odf/odf-store/pom.xml new file mode 100755 index 0000000..3d0a93d --- /dev/null +++ b/odf/odf-store/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0"?> +<!-- +~ +~ Licensed under the Apache License, Version 2.0 (the "License"); +~ you may not use this file except in compliance with the License. +~ You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>odf-store</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-core</artifactId> + <version>1.2.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-messaging</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.6</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.atlas.odf</groupId> + <artifactId>odf-core</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19</version> + <configuration> + <systemPropertyVariables> + <odf.logspec>${odf.unittest.logspec}</odf.logspec> + <odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect> + <odf.build.project.name>${project.name}</odf.build.project.name> + </systemPropertyVariables> + <dependenciesToScan> + <dependency>org.apache.atlas.odf:odf-core</dependency> + </dependenciesToScan> + <includes> + <include>**/configuration/**/*.java</include> + <include>**/ZookeeperConfigurationStorageTest.java</include> + </includes> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java ---------------------------------------------------------------------- diff --git a/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java new file mode 100755 index 0000000..3ea9927 --- /dev/null +++ b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java @@ -0,0 +1,247 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.store.zookeeper34; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.text.MessageFormat; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.configuration.ConfigContainer; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +import org.apache.atlas.odf.core.store.ODFConfigurationStorage; + +public class ZookeeperConfigurationStorage implements ODFConfigurationStorage { + private Logger logger = Logger.getLogger(ZookeeperConfigurationStorage.class.getName()); + static final String ZOOKEEPER_CONFIG_PATH = "/odf/config"; + static String configCache = null; // cache is a string so that the object is not accidentally modified + static Object configCacheLock = new Object(); + static HashSet<String> pendingConfigChanges = new HashSet<String>(); + + String zookeeperString; + + public ZookeeperConfigurationStorage() { + zookeeperString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString(); + } + + public void clearCache() { + synchronized (configCacheLock) { + configCache = null; + } + } + + @Override + public void storeConfig(ConfigContainer config) { + synchronized (configCacheLock) { + ZooKeeper zk = null; + String configTxt = null; + try { + configTxt = JSONUtils.toJSON(config); + zk = getZkConnectionSynchronously(); + if (zk.exists(getZookeeperConfigPath(), false) == null) { + //config file doesn't exist in zookeeper yet, write default config + logger.log(Level.WARNING, "Zookeeper config not found - creating it before writing: {0}", configTxt); + initializeConfiguration(zk, configTxt); + } + zk.setData(getZookeeperConfigPath(), configTxt.getBytes("UTF-8"), -1); + configCache = configTxt; + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException("A zookeeper connection could not be established in time to write settings"); + } catch (KeeperException e) { + if (Code.NONODE.equals(e.code())) { + logger.info("Setting could not be written, the required node is not available!"); + initializeConfiguration(zk, configTxt); + return; + } + //This should never happen! Only NoNode or BadVersion codes are possible. Because the file version is ignored, a BadVersion should never occur + throw new RuntimeException("A zookeeper connection could not be established because of an unknown exception", e); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("A zookeeper connection could not be established because of an incorrect encoding"); + } catch (JSONException e) { + throw new RuntimeException("Configuration is not valid", e); + } finally { + if (zk != null) { + try { + zk.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + } + + @Override + public ConfigContainer getConfig(ConfigContainer defaultConfiguration) { + synchronized (configCacheLock) { + if (configCache == null) { + ZooKeeper zk = getZkConnectionSynchronously(); + try { + if (zk.exists(getZookeeperConfigPath(), false) == null) { + //config file doesn't exist in zookeeper yet, write default config + String defaultConfigString = JSONUtils.toJSON(defaultConfiguration); + logger.log(Level.WARNING, "Zookeeper config not found - creating now with default: {0}", defaultConfigString); + initializeConfiguration(zk, defaultConfigString); + } + byte[] configBytes = zk.getData(getZookeeperConfigPath(), true, new Stat()); + if (configBytes != null) { + String configString = new String(configBytes, "UTF-8"); + configCache = configString; + } else { + // should never happen + throw new RuntimeException("Zookeeper configuration was not stored"); + } + } catch (KeeperException e) { + throw new RuntimeException(MessageFormat.format("Zookeeper config could not be read, {0} Zookeeper exception occured!", e.code().name()), e); + } catch (InterruptedException e) { + throw new RuntimeException("Zookeeper config could not be read, the connection was interrupded", e); + } catch (IOException | JSONException e) { + throw new RuntimeException("Zookeeper config could not be read, the file could not be parsed correctly", e); + } finally { + if (zk != null) { + try { + zk.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + + } + try { + return JSONUtils.fromJSON(configCache, ConfigContainer.class); + } catch (JSONException e) { + throw new RuntimeException("Cached configuration was not valid", e); + } + } + } + + private void initializeConfiguration(ZooKeeper zk, String config) { + try { + if (getZookeeperConfigPath().contains("/")) { + String[] nodes = getZookeeperConfigPath().split("/"); + StringBuilder path = new StringBuilder(); + for (String node : nodes) { + if (node.trim().equals("")) { + //ignore empty paths + continue; + } + path.append("/" + node); + try { + zk.create(path.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (NodeExistsException ex) { + //ignore if node already exists and continue with next node + } + } + } + + //use version -1 to ignore versioning conflicts + try { + zk.setData(getZookeeperConfigPath(), config.toString().getBytes("UTF-8"), -1); + } catch (UnsupportedEncodingException e) { + // should not happen + throw new RuntimeException(e); + } + } catch (KeeperException e) { + throw new RuntimeException(MessageFormat.format("The zookeeper config could not be initialized, a Zookeeper exception of type {0} occured!", e.code().name()), e); + } catch (InterruptedException e) { + throw new RuntimeException("The zookeeper config could not be initialized, the connection got interrupted!", e); + } + } + + private ZooKeeper getZkConnectionSynchronously() { + final CountDownLatch latch = new CountDownLatch(1); + logger.log(Level.FINE, "Trying to connect to zookeeper at {0}", zookeeperString); + ZooKeeper zk = null; + try { + int timeout = 5; + zk = new ZooKeeper(zookeeperString, timeout * 1000, new Watcher() { + + @Override + public void process(WatchedEvent event) { + if (event.getState().equals(Watcher.Event.KeeperState.ConnectedReadOnly) || event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { + //count down latch, connected successfully to zk + latch.countDown(); + } + } + }); + //block thread till countdown, maximum of "timeout" seconds + latch.await(5 * timeout, TimeUnit.SECONDS); + if (latch.getCount() > 0) { + zk.close(); + throw new RuntimeException("The zookeeper connection could not be retrieved on time!"); + } + return zk; + } catch (IOException e1) { + throw new RuntimeException("The zookeeper connection could not be retrieved, the connection failed!", e1); + } catch (InterruptedException e) { + throw new RuntimeException("Zookeeper connection could not be retrieved, the thread was interrupted!", e); + } + } + + public String getZookeeperConfigPath() { + return ZOOKEEPER_CONFIG_PATH; + } + + @Override + public void onConfigChange(ConfigContainer container) { + synchronized (configCacheLock) { + try { + configCache = JSONUtils.toJSON(container); + } catch (JSONException e) { + throw new RuntimeException("Config could not be cloned!", e); + } + } + } + + @Override + public void addPendingConfigChange(String changeId) { + synchronized (configCacheLock) { + pendingConfigChanges.add(changeId); + } + } + + @Override + public void removePendingConfigChange(String changeId) { + synchronized (configCacheLock) { + pendingConfigChanges.remove(changeId); + } + } + + @Override + public boolean isConfigChangePending(String changeId) { + synchronized (configCacheLock) { + return pendingConfigChanges.contains(changeId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties ---------------------------------------------------------------------- diff --git a/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties new file mode 100755 index 0000000..7234e9c --- /dev/null +++ b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties @@ -0,0 +1,34 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/odf-embedded-test-kafka/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties ---------------------------------------------------------------------- diff --git a/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties new file mode 100755 index 0000000..65a7b5d --- /dev/null +++ b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +ODFConfigurationStorage=org.apache.atlas.odf.core.store.zookeeper34.ZookeeperConfigurationStorage
