http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md 
b/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md
new file mode 100755
index 0000000..45f272b
--- /dev/null
+++ b/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md
@@ -0,0 +1,206 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Tutorial: Creating Spark discovery services
+
+This tutorial shows how to turn an existing [Apache Spark][1] application into 
an ODF discovery service of type *Spark*. The tutorial is based the Spark 
*summary statistics* example application provided with ODF in project 
`odf-spark-example-application`. It implements the Spark `describe()` method of 
the Spark [DataSet][2] class that calculates basic summary statistics on a 
Spark data frame.
+
+## Introduction
+
+ODF supports Spark applications implemented in Java or Scala. In order to be 
used as ODF discovery services, a Spark application must implement one of the 
following two interfaces:
+
+* **DataFrame** - intended for Spark applications that process relational 
tables by using Spark data frames internally.
+* **Generic** - intended for applications that need the full flexibility of 
ODF.
+
+Both interfaces requires a specific method (or multiple methods) to be 
implemented by the Spark application that is called by ODF to run the discovery 
service. This method takes the current Spark context and the data set to be 
processed as input parameters and returns the annotations to be created. The 
two interface types are described in detail in separate sections below.
+
+Spark discovery services must be packaged into a single application jar file 
that contains all required dependencies. Spark libraries, drivers for data 
access, and the required ODF jar files are implicitly provided by ODF and do 
not need to be packaged into the application jar file. The jar file may be 
renamed into *zip* by replacing its extension (not by zipping the jar file) in 
order to avoid possible security issues when making the file available trough 
tools like [box](https://box.com).
+
+### Configure an ODF Spark cluster
+
+ODF supports access to a local spark cluster which can be can be configured in 
the `sparkConfig` section of the ODF settings using the ODF REST API or the ODF 
web application. The parameter `clusterMasterUrl` must point to the master URL 
of your Spark cluster, e.g. 
`spark://dyn-9-152-202-64.boeblingen.de.ibm.com:7077`. An optional set of 
[Spark configuration 
options](http://spark.apache.org/docs/latest/configuration.html) can be set in 
the `configs` parameter by providing appropriate name value pairs. The ODF test 
environment comes with a ready-to-use local Spark cluser running on your local 
system. It can monitored on the URL `http://localhost:8080/`.
+
+### Registering a Spark service
+
+A Spark discovery service can be registered using the *Services* tab of the 
admin Web application or the `/services` endpoint of the [ODF REST 
API](../swagger/ext-services.html), the following parameters need to be 
specified to register a service. You may use the following example values to 
register your own instance of the *summary statistics* discovery service:
+
+* Name of the discovery service: `Spark Summary Statistics`
+* Description: `Calculates summary statistics for a given table or data file.`
+* Unique service ID: `spark-summary-statistics`
+* URL of application jar file (may be renamed to zip): 
`file:///tmp/odf-spark/odf-spark-example-application-1.2.0-SNAPSHOT.jar` 
(Update link to point to correct location of the file)
+* Name of entry point to be called: 
`org.apache.atlas.odf.core.spark.SummaryStatistics`
+* Service interface type: `DataFrame`
+
+For trying out the *generic* interface, entry point 
`org.apache.atlas.odf.spark.SparkDiscoveryServiceExample` and service interface 
type `Generic` may be specified.   
+
+### Testing the Spark service
+
+In order to test the Spark service, you can use the *DataSets* tab of the ODF 
admin Web application. Click on *START ANALYSIS* right to a relational data set 
(data file or relational table), then select the newly registered Spark 
discovery service and click *SUBMIT*. You can browse the resulting annotations 
by searching for the name of the annotation type in the Atlas metadata 
repository. The example services creates two types of annotations, 
*SummaryStatisticsAnnotation* and *SparkTableAnnotation*. 
*SummaryStatisticsAnnotation* annotates data set columns with the five 
attributes `count`, `mean`, `stddev`, `min`, and `max`, that represent basic 
statistics of the data set. *SparkTableAnnotation* annotates the data set with 
a single attribute `count` that represents the number of columns of the data 
set.
+
+### Developing Spark discovery services
+
+When developing a new discovery service, you may use project 
`odf-spark-example-application` as a template. Rather than testing your service 
interactively using the ODF admin web application it is recommended to create a 
new test case in class `SparkDiscoveryServiceTest` of project `odf-core`. Two 
methods need to be added, one for describing the service, the other for running 
the actual test.
+
+The method that describes the service basically contains the same parameters 
that need to be specified when adding a service through the admin webapp. The 
jar file must be an URL that pay point to a local file:  
+
+       public static DiscoveryServiceRegistrationInfo 
getSparkSummaryStatisticsService() {
+               DiscoveryServiceRegistrationInfo regInfo = new 
DiscoveryServiceRegistrationInfo();
+               regInfo.setId("spark-summary-statistics-example-service");
+               regInfo.setName("Spark summary statistics service");
+               regInfo.setDescription("Example discovery service calling 
summary statistics Spark application");
+               regInfo.setIconUrl("spark.png");
+               regInfo.setLink("http://www.spark.apache.org";);
+               regInfo.setParallelismCount(2);
+               DiscoveryServiceSparkEndpoint endpoint = new 
DiscoveryServiceSparkEndpoint();
+               
endpoint.setJar("file:/tmp/odf-spark-example-application-1.2.0-SNAPSHOT.jar");
+               
endpoint.setClassName("org.apache.atlas.odf.core.spark.SummaryStatistics");
+               endpoint.setInputMethod(SERVICE_INTERFACE_TYPE.DataFrame);
+               regInfo.setEndpoint(endpoint);
+               return regInfo;
+       }
+
+The method that runs the actual test retrieves the service description from 
the above method and specifies what type of data set should be used for testing 
(data file vs. relational table) and what types of annotations are created by 
the discovery service. The test automatically applies the required 
configurations, runs the service, and checks whether new annotations of the 
respective types have been created. In order to speed up processing, the 
existing test can be temporarily commented out.  
+
+       @Test
+       public void testLocalSparkClusterWithLocalDataFile() throws Exception{
+               runSparkServiceTest(
+                       getLocalSparkConfig(),
+                       DATASET_TYPE.DataFile,
+                       getSparkSummaryStatisticsService(),
+                       new String[] { "SparkSummaryStatisticsAnnotation", 
"SparkTableAnnotation" }
+               );
+       }
+
+For compiling the test case, the `odf-core` project needs to be built:
+
+       cd ~/git/shared-discovery-platform/odf-core
+       mvn clean install -DskipTests
+
+The test is started implicitly when building the  `odf-spark` project.
+
+       cd ~/git/shared-discovery-platform/odf-spark
+       mvn clean install
+
+If something goes wrong, debugging information will be printed to stdout 
during the test. For speeding up the build and test process, option 
`-Duse.running.atlas` may be added to the two `mvn` commands. This way, a 
running Atlas instance will be used instead of starting a new instance every 
time.
+
+#### Test run method example
+
+### Troubleshooting
+
+Before registering a Spark application in ODF as a new discovery service, it 
is highly recommended to test the application interactively using the 
`spark-submit` tool and to check whether the application implements the 
requested interfaces and produces the expected output format. If the execution 
of a Spark discovery service fails, you can browse the ODF log for additional 
information.
+
+## DataFrame interface
+
+The ODF *DataFrame* interface for Spark discovery services has a number of 
advantages that makes it easy to turn an existing Spark application into an ODF 
discovery service:
+
+* No dependencies to the ODF code, except that a specific method needs to be 
implemented.
+* No need to care about data access because the data set to be analyzed is 
provided as Spark data frame.
+* Easy creation of annotations by returning "annotation data frames".   
+
+The simplicity of the DataFrame interface leads to a number of restrictions:
+
+* Only relational data sets can be processed, i.e. data files (OMDataFile) and 
relational tables (OMTable).
+* Annotations may only consist of a flat list of attributes that represent 
simple data types, i.e. data structures and references to other data sets are 
not supported.  
+* Annotations may only be attached to the analyzed relational data set as well 
as to its columns.
+
+### Method to be implemented
+
+In order to implement the DataFrame interface, the Spark application must 
implement the following method:
+
+       public static Map<String,Dataset<Row>> 
processDataFrame(JavaSparkContext sc, DataFrame df, String[] args)
+
+The parameters to be provided to the Spark application are:
+
+* **sc**: The Spark context to be used by the Spark application for performing 
all Spark operations.
+* **df**: The data set to be analyzed represented by a Spark data frame.
+* **args**: Optional arguments for future use.
+
+### Expected output
+
+The result to be provided by the Spark application must be of type 
`Map<String,Dataset<Row>>` where `String` represents the type of the annotation 
to be created and `Dataset<Row>` represents the *annotation data frame* that 
defines the annotations to be created. If the annotation type does not yet 
exist, a new annotation type will be dynamically created based on the 
attributes of the annotation data frame.
+
+The following example describes the format of the annotation data frame. The 
example uses the BankClientsShort data file provided with ODF. In contains 16 
columns with numeric values that represent characteristics of bank clients:
+
+CUST_ID | ACQUIRED | FIRST_PURCHASE_VALUE | CUST_VALUE_SCORE | 
DURATION_OF_ACQUIRED | CENSOR | ACQ_EXPENSE | ACQ_EXPENSE_SQ | IN_B2B_INDUSTRY 
| ANNUAL_REVENUE_MIL | TOTAL_EMPLOYEES | RETAIN_EXPENSE | RETAIN_EXPENSE_SQ 
CROSSBUY | PURCHASE_FREQ | PURCHASE_FREQ_SQ
+---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---
+481 | 0 | 0.0 | 0.0000 | 0 | 0 | 382.32 | 146168.58 | 0 | 56.51 | 264 | 0.00 | 
0.0 | 0 | 0 | 0
+482 | 1 | 249.51 | 59.248 | 730 | 1 | 586.61 | 344111.29 | 1 | 35.66 | 355 | 
1508.16 | 2274546.59 | 2 | 3 | 9
+483 | 0 | 0.0 | 0.0000 | 0 | 0 | 444.61 | 197678.05 | 1 | 40.42 | 452 | 0.00 | 
0.0 | 0 | 0 | 0
+484 | 1 | 351.41 | 77.629 | 730 | 1 | 523.10 | 273633.61 | 1 | 56.36 | 320 | 
2526.72 | 6384313.96 | 3 | 12 | 144
+485 | 1 | 460.04 | 76.718 | 730 | 1 | 357.78 | 128006.53 | 1 | 23.53 | 1027 | 
2712.48 | 7357547.75 | 2 | 13 | 169
+486 | 1 | 648.6 | 0.0000 | 701 | 0 | 719.61 | 517838.55 | 0 | 59.97 | 1731 | 
1460.64 | 2133469.21 | 5 | 11 | 121
+487 | 1 | 352.84 | 63.370 | 730 | 1 | 593.44 | 352171.03 | 1 | 45.08 | 379 | 
1324.62 | 1754618.14 | 4 | 8 | 64
+488 | 1 | 193.18 | 0.0000 | 289 | 0 | 840.30 | 706104.09 | 0 | 35.95 | 337 | 
1683.83 | 2835283.47 | 6 | 12 | 144
+489 | 1 | 385.14 | 0.0000 | 315 | 0 | 753.13 | 567204.80 | 0 | 58.85 | 745 | 
1214.99 | 1476200.7 | 1 | 12 | 144
+
+When applying the *Spark Summary Statistics* service to the table, two 
annotation data frames will be returned by the service, one for the 
*SparkSummaryStatistics* and one for the *SparkTableAnnotation* annotation 
type. The data frame returned for the *SparkSummaryStatistics* annotation type 
consists of one column for each attribute of the annotation. In the example, 
the attributes are `count`, `mean`, `stddev`, `min`, and `max` standing for the 
the column count, the mean value, the standard deviation, the minimum and the 
maximum value of each column. Each row represents one annotation to be created. 
The first column `ODF_ANNOTATED_COLUMN` stands for the column of the input data 
frame to which the annotation should be assigned.
+
+ODF_ANNOTATED_COLUMN    |count   |                mean |              stddev | 
      min |       max
+------------------------|--------|---------------------|---------------------|-----------|----------
+              CLIENT_ID |  499.0 |   1764.374749498998 |  108.14436025195488 | 
   1578.0 |    1951.0
+                    AGE |  499.0 |   54.65130260521042 |  19.924220223453258 | 
     17.0 |      91.0
+          NBR_YEARS_CLI |  499.0 |  16.847695390781563 |  10.279080097460023 | 
      0.0 |      48.0
+        AVERAGE_BALANCE |  499.0 |   17267.25809619238 |   30099.68272689043 | 
 -77716.0 |  294296.0
+             ACCOUNT_ID |  499.0 |   126814.4749498998 |  43373.557241804665 | 
 101578.0 |  201950.0
+
+If there is no (first) column named `ODF_ANNOTATED_COLUMN`, the annotations 
will be assigned to the data set rather than to its columns. The following 
example annotation data frame of type *SparkTableAnnotation* assigns a single 
attribute `count` to the data set:
+
+| count |
+|-------|
+| 499   |
+
+### Example implementation
+
+The implementation of the The *summary statistics*  discovery service may be 
used as a reference implementation for the DataFrame interface. It is available 
in class `SummaryStatistics` of project `odf-spark-example-application`.
+
+## Generic interface
+
+The *generic* interface provides the full flexibility of ODF discovery 
services implemented in Java (or Scala):
+
+* No restrictions regarding the types of data sets to be analyzed.
+* Arbitrary objects may be annotated because references to arbitrary objects 
may be retrieved from the meta data catalog.
+* Annotations may contain nested structures of data types and references to 
arbitrary objects.
+
+On the downside, the generic interface may be slightly more difficult to use 
than the DataFrame interface:
+
+* Discovery service must implement a specific ODF interface.
+* Spark RDDs, data frames etc. must be explicitly constructed (Helper methods 
are available in class `SparkUtils`).
+* Resulting annotations must be explicitly constructed and linked to the 
annotated objects.
+
+### Methods to be implemented
+
+The Spark application must implement the `SparkDiscoveryService` interface 
available in ODF project `odf-core-api`:
+
+       public class SparkDiscoveryServiceExample extends 
SparkDiscoveryServiceBase implements SparkDiscoveryService
+
+The interface consists of the following two methods that are described in 
detail in the [Java Docs for ODF services](./apidocs/index.html). The 
`SparkDiscoveryServiceBase` can be extended for convenience as the 
`SparkDiscoveryService` interface has much more methods.
+
+#### Actual discovery service logic
+
+This method is called to run the actual discovery service.
+
+       DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request)
+
+#### Validation whether data set can be accessed
+
+This method is called internally before running the actual discovery service.
+
+       DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer)
+
+### Example implementation
+
+Class class `SparkDiscoveryServiceExample` in project 
`odf-spark-example-application` provides an example implementation of a 
*generic* discovery service. It provides an alternative implementation of the 
*summary statistics*  discovery service.
+
+  [1]: http://spark.apache.org/
+  [2]: http://spark.apache.org/docs/latest/api/java/index.html

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/markdown/test-env.md
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/markdown/test-env.md 
b/odf/odf-doc/src/site/markdown/test-env.md
new file mode 100755
index 0000000..a0697bf
--- /dev/null
+++ b/odf/odf-doc/src/site/markdown/test-env.md
@@ -0,0 +1,84 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Test environment
+
+The odf-test-env archive contains a simple test environment for ODF.
+It contains all components to run a simple ODF installation, namely
+
+- Apache Kafka
+- Apache Atlas
+- Jetty (to host the ODF web app)
+- Apache Spark
+
+The test environment is available on Linux and Windows.
+
+## Before you start
+
+Make sure that
+
+- The python executable of Python 2.7 is in your path
+- The environment variable JAVA_HOME is set and points to a proper JDK (not 
just a JRE!)
+
+
+## *Fast path*: Download and install ODF test environment
+
+If you are running on Linux you can download and install the latest ODF test 
environment by
+downloading the script `download-install-odf-testenv.sh` from
+<a 
href="https://shared-discovery-platform-jenkins.swg-devops.com:8443/view/1-ODF/job/Open-Discovery-Framework/lastSuccessfulBuild/artifact/odf-test-env/src/main/scripts/download-install-odf-testenv.sh";>
+here</a>.
+
+If you call the script with no parameters, it will download, install and start 
the latest version of the test env.
+The default unpack directory is `~/odf-test-env`.
+
+## Download the test environment manually
+
+You can get the latest version of the test environment from the Jenkins
+<a 
href="https://shared-discovery-platform-jenkins.swg-devops.com:8443/view/1-ODF/job/Open-Discovery-Framework/lastSuccessfulBuild/artifact/odf-test-env/target/odf-test-env-0.1.0-SNAPSHOT-bin.zip";>
+here</a>.
+
+## Running the test environment
+
+To start the test environment on Linux, run the script ``odftestenv.sh start`` 
. The script will start four background processes (Zookeeper, Kafka, Atlas, 
Jetty). To stop the test env, use the script ``odftestenv.sh stop``.
+
+To start the test environment on Windows, run the script 
``start-odf-testenv.bat``.
+This will open four command windows (Zookeeper, Kafka, Atlas, Jetty) with 
respective window titles. To stop the test environment close all these windows. 
Note that the `HADOOP_HOME` environment variable needs to be set on Windows as 
described in the [build documentation](build.md).
+
+
+Once the servers are up and running you will reach the ODF console at
+[https://localhost:58081/odf-web-0.1.0-SNAPSHOT](https://localhost:58081/odf-web-0.1.0-SNAPSHOT).
+
+*Note*: The test environment scripts clean the Zookeeper and Kafka data before 
it starts.
+This means in particular that the configuration will be reset every time you 
restart it!
+
+Have fun!
+
+## Restart / cleanup
+
+On Linux, the `odftestenv.sh` script has these additional options
+
+- `cleanconfig`: Restart the test env with a clean configuration and clean 
Kafka topics
+- `cleanmetadata`: Restart with empty metadata
+- `cleanall`: Both `cleanconfig`and `cleanmetadata`.
+
+
+## Additional Information
+
+### Deploying a new version of the ODF war
+Once started you can hot-deploy a new version of the ODF war file simply by 
copying it
+to the ``odfjettybase/webapps`` folder even while the test environment's Jetty 
instance is running.
+Note that it may take a couple of seconds before the new app is available.
+
+If you have the ODF build set up you may want to use the 
``deploy-odf-war.bat/.sh`` for this.
+You must edit the environment variable ``ODF_GIT_DIR`` in this script first to 
point to your local build directory.

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/markdown/troubleshooting.md
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/markdown/troubleshooting.md 
b/odf/odf-doc/src/site/markdown/troubleshooting.md
new file mode 100755
index 0000000..971bfc6
--- /dev/null
+++ b/odf/odf-doc/src/site/markdown/troubleshooting.md
@@ -0,0 +1,123 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Troubleshooting
+
+## ODF
+
+### Debugging using eclipse
+
+You can run Jetty inside Eclipse using the “Eclipse Jetty Feature” 
(Eclipse -> Help -> Install New Software…).
+Then, create a new debug configuration (Run -> Debug Configurations…). 
Specify
+
+WebApp Tab
+Project: odf-web
+WebApp Folder: ../../../../../odf-web/src/main/webapp
+Context Path: /odf-web-0.1.0-SNAPSHOT
+HTTP / HTTPs Port: 58081
+
+Arguments Tab
+VM Arguments: -Dodf.zookeeper.connect=localhost:52181
+
+As the Eclipse Jetty plugin does not support secure connections nor basic 
authentication, remove the `<security-constraint>`
+and `<login-config>`
+sections from the web.xml.
+The URL of the ODF Webapp the needs to be prefixed with http:// rather than 
https://.
+
+Then start Atlas and Kafka via the test-env (just comment out the line that 
starts jetty or stop it after being started).
+Now you can use the debug configuration in eclipse to start ODF.
+
+See also 
(https://ibm-analytics.slack.com/archives/shared-discovery-pltf/p1467365155000009)
+
+
+### Logs and trace
+ODF uses ``java.util.logging`` APIs so if your runtime environment does support
+direct setting, use the respective mechanism.
+
+For runtimes that don't support this out-of-the-box (like Jetty) you can set 
the JVM system property
+``odf.logspec`` with a value like ``<Level>,<Path>`` which advises ODF to
+write the log with logging level ``<Level>`` to the file under ``<Path>``.
+
+Example:
+
+       -Dodf.logspec=ALL,/tmp/myodflogfile.log
+
+Availabel log levels are the ones for java.util.logging, namely SEVERE, 
WARNING, INFO, FINE, FINER, FINEST,
+and ALL.
+
+
+## Atlas
+
+### Logs
+
+The logs directory contains a bunch of logfiles, together with a file called 
``atlas.pid`` which
+contains the process ID of the Atlas server that is currently running.
+In case of issues the file ``logs/application.log`` should be checked first.
+
+### Restarting Atlas
+
+Run these commands (from the atlas installation directory) to restart Atlas
+
+       bin/atlas_stop.py
+       bin/atlas_start.py
+
+### Clean all data
+
+To clean the Atlas repository, simply remove the directories ``data`` and 
``logs`` before starting.
+
+
+### Issues
+
+#### Service unavailable (Error 503)
+
+Sometimes, calling any Atlas REST API (and the UI) doesn't work and an HTTP 
error 503 is returned.
+We see this error occasionally and don't know any way to fix it except 
cleaning all data and restarting Atlas
+
+
+### Creating Atlas object take a long time
+
+It takes a long time to create an Atlas object and after about a minute you 
see a message like this in the log
+
+       Unable to update metadata after 60000ms
+
+This is the result of the kafka queues (which are used for notifications) 
being in error.
+To fix this restart Atlas (no data cleaning required).
+
+## Kafka / Zookeeper
+
+If there is a problem starting Kafka / Zookeeper check if there might be a 
port conflict due to other instances of Kafka / Zookeeper using the default 
port.
+This might be the case if a more recent version of the IS suite is installed 
on the system on which you want to run ODF.
+
+Example: If another instance of Zookeeper uses the default port 52181 you need 
to switch the Zookeeper port used by replacing 52181 with a free port number in:
+- start-odf-testenv.bat
+- kafka_2.10-0.8.2.1\config\zookeeper.properties
+- kafka_2.10-0.8.2.1\config\server.properties
+
+### Reset
+
+To reset your Zookeeper / Kafka installation, you will first have to stop the 
servers:
+
+       bin/kafka-server-stop
+       bin/zookeeper-server-stop
+
+Next remove the zookeeper data directory and the Kafka logs directory. Note 
that "logs"
+in Kafka mean the actual data in the topics not the logfiles.
+You can find which directories to clean in the the properties ``dataDir`` in 
the ``zookeeper.properties``
+file and ``log.dirs`` in ``server.properties`` respectively.
+The defaults are ``/tmp/zookeeper`` and ``/tmp/kafka-logs``.
+
+Restart the servers with
+
+       bin/zookeeper-server-start config/zookeeper.properties
+       bin/kafka-server-start config/server.properties

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
----------------------------------------------------------------------
diff --git 
a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
new file mode 100755
index 0000000..e6ffb46
--- /dev/null
+++ 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>odf.tutorials</groupId>
+       <artifactId>odf-tutorial-discoveryservice</artifactId>
+       <version>1.2.0-SNAPSHOT</version>
+       <packaging>jar</packaging>
+
+       <name>odf-tutorial-discoveryservice</name>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-api</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.12</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
new file mode 100755
index 0000000..2899a53
--- /dev/null
+++ 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package odftutorial.discoveryservicetutorial;
+
+import org.apache.atlas.odf.core.metadata.Annotation;
+
+/*
+ * An example annotation that adds one property to the default annotation class
+ */
+public class ODFTutorialAnnotation extends Annotation {
+
+       private String tutorialProperty;
+
+       public String getTutorialProperty() {
+               return tutorialProperty;
+       }
+
+       public void setTutorialProperty(String tutorialProperty) {
+               this.tutorialProperty = tutorialProperty;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
new file mode 100755
index 0000000..16848ec
--- /dev/null
+++ 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package odftutorial.discoveryservicetutorial;
+
+import java.util.Collections;
+import java.util.Date;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import 
org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+
+/**
+ * A simple synchronous discovery service that creates one annotation for the 
data set it analyzes.
+ *
+ */
+public class ODFTutorialDiscoveryService extends SyncDiscoveryServiceBase {
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               // 1. create an annotation that annotates the data set object 
passed in the request
+               ODFTutorialAnnotation annotation = new ODFTutorialAnnotation();
+               
annotation.setAnnotatedObject(request.getDataSetContainer().getDataSet().getReference());
+               // set a new property called "tutorialProperty" to some string
+               annotation.setTutorialProperty("Tutorial annotation was created 
on " + new Date());
+
+               // 2. create a response with our annotation created above
+               return createSyncResponse( //
+                               ResponseCode.OK, // Everything works OK 
+                               "Everything worked", // human-readable message
+                               Collections.singletonList(annotation) // new 
annotations
+               );
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
----------------------------------------------------------------------
diff --git 
a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
new file mode 100755
index 0000000..2709548
--- /dev/null
+++ 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
@@ -0,0 +1,13 @@
+[
+  {
+       "id": 
"odftutorial.discoveryservicetutorial.ODFTutorialDiscoveryService",
+       "name": "First tutorial service",
+       "description": "The first tutorial service that is synchronous and 
creates just a single annotation for a data set.",
+       "deletable": true,
+       "endpoint": {
+         "runtimeName": "Java",
+         "className": 
"odftutorial.discoveryservicetutorial.ODFTutorialDiscoveryService"
+       },
+       "iconUrl": 
"https://www-03.ibm.com/ibm/history/exhibits/logo/images/920911.jpg";
+  }
+]

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
new file mode 100755
index 0000000..1eab53f
--- /dev/null
+++ 
b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package odftutorial.discoveryservicetutorial;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for discovery service
+ */
+public class ODFTutorialDiscoveryServiceTest {
+       
+       @Test
+       public void test() throws Exception {
+               Assert.assertTrue(true);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/site.xml
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/site.xml b/odf/odf-doc/src/site/site.xml
new file mode 100755
index 0000000..c810e66
--- /dev/null
+++ b/odf/odf-doc/src/site/site.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project name="Open Discovery Framework">
+       <skin>
+               <groupId>org.apache.maven.skins</groupId>
+               <artifactId>maven-fluido-skin</artifactId>
+               <version>1.4</version>
+       </skin>
+       <bannerLeft>
+               <name>Open Discovery Framework</name>
+       </bannerLeft>
+       <custom>
+               <fluidoSkin>
+                       <topBarEnabled>false</topBarEnabled>
+                       <sideBarEnabled>true</sideBarEnabled>
+               </fluidoSkin>
+       </custom>
+       <body>
+               <links>
+                       <item name="Apache Atlas" 
href="http://atlas.incubator.apache.org"; />
+                       <item name="Apache Kafka" 
href="http://kafka.apache.org"; />
+               </links>
+               <menu name="Getting Started">
+                       <item name="Overview" href="index.html" />
+                       <item name="First Steps" href="first-steps.html" />
+                       <item name="Build" href="build.html" />
+                       <item name="Test Environment" href="test-env.html" />
+               </menu>
+               <menu name="Tutorials">
+                       <item name="Install ODF and its prerequisites manually" 
href="install.html"/>
+                       <item name="Run your first ODF analysis" 
href="first-analysis-tutorial.html"/>
+                       <item name="Build and run your first Discovery Service" 
href="discovery-service-tutorial.html"/>
+                       <item name="Creating Spark discovery services" 
href="spark-discovery-service-tutorial.html"/>
+               </menu>
+               <menu name="Reference">
+                       <item name="ODF Metadata API" 
href="odf-metadata-api.html" />
+                       <item name="API reference" href="api-reference.html" />
+                       <item name="Troubleshooting" 
href="troubleshooting.html" />
+               </menu>
+               <menu name="Customization">
+                       <item name="Discovery Services" 
href="discovery-services.html" />
+                       <item name="Data Model" href="data-model.html" />
+               </menu>
+               <menu name="Internal">
+                       <item name="Jenkins build" href="jenkins-build.html" />
+               </menu>
+               <footer>All rights reserved.</footer>
+       </body>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/.gitignore b/odf/odf-messaging/.gitignore
new file mode 100755
index 0000000..94858e5
--- /dev/null
+++ b/odf/odf-messaging/.gitignore
@@ -0,0 +1,19 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+derby.log

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/pom.xml b/odf/odf-messaging/pom.xml
new file mode 100755
index 0000000..95f9d44
--- /dev/null
+++ b/odf/odf-messaging/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>odf-messaging</artifactId>
+       <name>odf-messaging</name>
+
+       <parent>
+               <groupId>org.apache.atlas.odf</groupId>
+               <artifactId>odf</artifactId>
+               <version>1.2.0-SNAPSHOT</version>
+       </parent>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-api</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-core</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>0.10.0.0</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_2.11</artifactId>
+                       <version>0.10.0.0</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.12</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-core</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.derby</groupId>
+                       <artifactId>derby</artifactId>
+                       <version>10.12.1.1</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <version>2.6</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                               <configuration>
+                                               <!-- remove implementations 
properties file for test jar -->
+                                                       <excludes>
+                                                               
<exclude>org/apache/atlas/odf/odf-implementation.properties</exclude>
+                                                       </excludes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <profiles>
+               <profile>
+                       <id>all-unit-tests</id>
+                       <activation>
+                               <activeByDefault>true</activeByDefault>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-surefire-plugin</artifactId>
+                                               <version>2.19</version>
+                                               <configuration>
+                                                       
<systemPropertyVariables>
+                                                               
<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+                                                               
<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+                                                               
<odf.build.project.name>${project.name}</odf.build.project.name>
+
+                                                               <!-- additional 
properties for the test services -->
+                                                               
<asynctestservice.testparam>sometestvalueforasync</asynctestservice.testparam>
+                                                               
<synctestservice.testparam>sometestvalueforsync</synctestservice.testparam>
+                                                       
</systemPropertyVariables>
+                                                       <dependenciesToScan>
+                                                               
<dependency>org.apache.atlas.odf:odf-core</dependency>
+                                                       </dependenciesToScan>
+                                                       <!--
+                                                       
<includes><include>**ShutdownTest**</include></includes>
+                                -->
+
+                                                       <excludes>
+                                                               
<exclude>**/integrationtest/**</exclude>
+                                                               
<exclude>**/configuration/**</exclude>
+                                                       </excludes>
+                                               </configuration>
+                                       </plugin>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-failsafe-plugin</artifactId>
+                                               <version>2.19</version>
+                                               <configuration>
+                                                       
<systemPropertyVariables>
+                                                               
<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+                                                               
<odf.logspec>${odf.integrationtest.logspec}</odf.logspec>
+                                                       
</systemPropertyVariables>
+                                                       <dependenciesToScan>
+                                                               
<dependency>org.apache.atlas.odf:odf-core</dependency>
+                                                       </dependenciesToScan>
+                                                       <includes>
+                                                               
<include>**/integrationtest/**/**.java</include>
+                                                       </includes>
+                                                       <excludes>
+                                                               
<exclude>**/integrationtest/**/SparkDiscoveryService*</exclude>
+                                                               
<exclude>**/integrationtest/**/AnalysisManagerTest.java</exclude>
+                                                       </excludes>
+                                               </configuration>
+                                               <executions>
+                                                       <execution>
+                                                               
<id>integration-test</id>
+                                                               <goals>
+                                                                       
<goal>integration-test</goal>
+                                                               </goals>
+                                                       </execution>
+                                                       <execution>
+                                                               <id>verify</id>
+                                                               <goals>
+                                                                       
<goal>verify</goal>
+                                                               </goals>
+                                                       </execution>
+                                               </executions>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+               <profile>
+                       <id>reduced-tests</id>
+                       <activation>
+                               <property>
+                                       <name>reduced-tests</name>
+                                       <value>true</value>
+                               </property>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-surefire-plugin</artifactId>
+                                               <version>2.19</version>
+                                               <configuration>
+                                                       
<systemPropertyVariables>
+                                                               
<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+                                                               
<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+                                                               
<odf.build.project.name>${project.name}</odf.build.project.name>
+                                                       
</systemPropertyVariables>
+                                                       <dependenciesToScan>
+                                                               
<dependency>corg.apache.atlas.odf:odf-core</dependency>
+                                                       </dependenciesToScan>
+                                                       <excludes>
+                                                               
<exclude>**/KafkaQueueManagerTest.java</exclude>
+                                                               
<exclude>**/ShutdownTest.java</exclude>
+                                                               
<exclude>**/MultiPartitionConsumerTest.java</exclude>
+                                                               
<exclude>**/integrationtest/**/SparkDiscoveryService*</exclude>
+                                                               
<exclude>**/integrationtest/**/AnalysisManagerTest.java</exclude>
+                                                               
<exclude>**/configuration/**</exclude>
+                                                       </excludes>
+                                               </configuration>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
new file mode 100755
index 0000000..c9c95cc
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.odf.api.engine.BrokerNode;
+import org.apache.atlas.odf.api.engine.KafkaBrokerPartitionMessageCountInfo;
+import org.apache.atlas.odf.api.engine.KafkaPartitionInfo;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.json.JSONUtils;
+
+import kafka.admin.AdminClient;
+import kafka.admin.AdminClient.ConsumerSummary;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.GroupCoordinatorRequest;
+import kafka.api.GroupCoordinatorResponse;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.cluster.BrokerEndPoint;
+import kafka.cluster.EndPoint;
+import kafka.common.ErrorMapping;
+import kafka.common.OffsetAndMetadata;
+import kafka.common.OffsetMetadata;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.coordinator.GroupOverview;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetCommitRequest;
+import kafka.javaapi.OffsetCommitResponse;
+import kafka.javaapi.OffsetFetchRequest;
+import kafka.javaapi.OffsetFetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import kafka.network.BlockingChannel;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+public class KafkaMonitor {
+       private final static String CLIENT_ID = "odfMonitorClient";
+
+       private Logger logger = Logger.getLogger(KafkaMonitor.class.getName());
+
+       //this only works for consumer groups managed by the kafka coordinator 
(unlike with kafka < 0.9 where consumers where managed by zookeeper)
+       public List<String> getConsumerGroups(String zookeeperHost, String 
topic) {
+               List<String> result = new ArrayList<String>();
+               try {
+                       List<String> brokers = getBrokers(zookeeperHost);
+                       StringBuilder brokersParam = new StringBuilder();
+                       final Iterator<String> iterator = brokers.iterator();
+                       while (iterator.hasNext()) {
+                               brokersParam.append(iterator.next());
+                               if (iterator.hasNext()) {
+                                       brokersParam.append(";");
+                               }
+                       }
+                       Properties props = new Properties();
+                       props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
brokersParam.toString());
+                       final AdminClient client = AdminClient.create(props);
+                       final Map<Node, 
scala.collection.immutable.List<GroupOverview>> javaMap = 
JavaConversions.mapAsJavaMap(client.listAllConsumerGroups());
+                       for (Entry<Node, 
scala.collection.immutable.List<GroupOverview>> entry : javaMap.entrySet()) {
+                               for (GroupOverview group : 
JavaConversions.seqAsJavaList(entry.getValue())) {
+                                       
//Option<scala.collection.immutable.List<ConsumerSummary>> optConsumerSummary = 
client.describeConsumerGroup(group.groupId());
+                                       //if (optConsumerSummary.nonEmpty()) {
+                                               for (ConsumerSummary summary : 
JavaConversions.seqAsJavaList(client.describeConsumerGroup(group.groupId()) ) ) 
{
+                                                       for (TopicPartition 
part : JavaConversions.seqAsJavaList(summary.assignment())) {
+                                                               if 
(part.topic().equals(topic) && !result.contains(group.groupId())) {
+                                                                       
result.add(group.groupId());
+                                                               break;
+                                                       }
+                                               }
+                                       }
+                                       //}
+                               }
+                       }
+               } catch (Exception ex) {
+                       logger.log(Level.WARNING, "An error occured retrieving 
the consumer groups", ex.getCause());
+                       ex.printStackTrace();
+               }
+               return result;
+       }
+
+       private ZkUtils getZkUtils(String zookeeperHost, ZkClient zkClient) {
+               return new ZkUtils(zkClient, new ZkConnection(zookeeperHost), 
false);
+       }
+
+       private ZkClient getZkClient(String zookeeperHost) {
+               return new ZkClient(zookeeperHost, 5000, 5000, 
ZKStringSerializer$.MODULE$);
+       }
+
+       public boolean setOffset(String zookeeperHost, String consumerGroup, 
String topic, int partition, long offset) {
+               logger.info("set offset for " + consumerGroup + " " + offset);
+               long now = System.currentTimeMillis();
+               Map<TopicAndPartition, OffsetAndMetadata> offsets = new 
LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
+               final TopicAndPartition topicAndPartition = new 
TopicAndPartition(topic, partition);
+               offsets.put(topicAndPartition, new OffsetAndMetadata(new 
OffsetMetadata(offset, "Manually set offset"), now, -1));
+               int correlationId = 0;
+               OffsetCommitRequest req = new 
OffsetCommitRequest(consumerGroup, offsets, correlationId++, CLIENT_ID, (short) 
1);
+               final BlockingChannel channel = 
getOffsetManagerChannel(zookeeperHost, consumerGroup);
+               channel.send(req.underlying());
+               OffsetCommitResponse commitResponse = 
OffsetCommitResponse.readFrom(channel.receive().payload());
+               if (commitResponse.hasError()) {
+                       logger.warning("Could not commit offset! " + topic + 
":" + partition + "-" + offset + " error: " + 
commitResponse.errorCode(topicAndPartition));
+                       channel.disconnect();
+                       return false;
+               } else {
+                       logger.info("offset commit successfully");
+                       channel.disconnect();
+                       return true;
+               }
+       }
+
+       public List<String> getBrokers(String zookeeperHost) {
+               List<String> result = new ArrayList<String>();
+               ZkClient zkClient = getZkClient(zookeeperHost);
+               List<Broker> brokerList = 
JavaConversions.seqAsJavaList(getZkUtils(zookeeperHost, 
zkClient).getAllBrokersInCluster());
+               Iterator<Broker> brokerIterator = brokerList.iterator();
+               while (brokerIterator.hasNext()) {
+                       for (Entry<SecurityProtocol, EndPoint> entry : 
JavaConversions.mapAsJavaMap(brokerIterator.next().endPoints()).entrySet()) {
+                               String connectionString = 
entry.getValue().connectionString();
+                               //remove protocol from string
+                               connectionString = 
connectionString.split("://")[1];
+                               result.add(connectionString);
+                       }
+               }
+               zkClient.close();
+               return result;
+       }
+
+       public PartitionOffsetInfo getOffsetsOfLastMessagesForTopic(String 
zookeeperHost, String topic, int partition) {
+               List<String> kafkaBrokers = getBrokers(zookeeperHost);
+               return getOffsetsOfLastMessagesForTopic(kafkaBrokers, topic, 
partition);
+       }
+
+       public PartitionOffsetInfo getOffsetsOfLastMessagesForTopic(final 
List<String> kafkaBrokers, final String topic, final int partition) {
+               logger.entering(this.getClass().getName(), 
"getOffsetsOfLastMessagesForTopic");
+
+               final PartitionOffsetInfo info = new PartitionOffsetInfo();
+               info.setOffset(-1l);
+               info.setPartitionId(partition);
+
+               final CountDownLatch subscribeAndPollLatch = new 
CountDownLatch(2);
+
+               final Thread consumerThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               Properties kafkaConsumerProps = 
getKafkaConsumerProps(kafkaBrokers);
+                               final KafkaConsumer<String, String> consumer = 
new KafkaConsumer<String, String>(kafkaConsumerProps);
+                               final TopicPartition topicPartition = new 
TopicPartition(topic, partition);
+                               consumer.subscribe(Arrays.asList(topic), new 
ConsumerRebalanceListener() {
+
+                                       @Override
+                                       public void 
onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                                               // TODO Auto-generated method 
stub
+
+                                       }
+
+                                       @Override
+                                       public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                                               
subscribeAndPollLatch.countDown();
+                                       }
+                               });
+                               logger.info("poll records from kafka for offset 
retrieval");
+
+                               final ConsumerRecords<String, String> poll = 
consumer.poll(500);
+                               List<ConsumerRecord<String, String>> 
polledRecords = poll.records(topicPartition);
+                               logger.info("polled records: " + poll.count());
+                               if (!polledRecords.isEmpty()) {
+                                       ConsumerRecord<String, String> record = 
polledRecords.get(polledRecords.size() - 1);
+                                       info.setMessage(record.value());
+                                       info.setOffset(record.offset());
+                                       info.setPartitionId(partition);
+                                       logger.info("polled last offset: " + 
record.offset());
+                               }
+                               subscribeAndPollLatch.countDown();
+                               consumer.close();
+                       }
+               });
+               logger.info("start retrieval of offset");
+               consumerThread.start();
+
+               try {
+                       boolean result = subscribeAndPollLatch.await(5000, 
TimeUnit.MILLISECONDS);
+                       if (result) {
+                               logger.info("Subscribed and retrieved offset on 
time: " + JSONUtils.toJSON(info));
+                       } else {
+                               logger.warning("Could not subscribe and 
retrieve offset on time " + JSONUtils.toJSON(info));
+                               consumerThread.interrupt();
+                       }
+               } catch (InterruptedException e) {
+                       e.printStackTrace();
+                       logger.log(Level.WARNING, "An error occured retrieving 
the last retrieved offset", e.getCause());
+               } catch (JSONException e) {
+                       e.printStackTrace();
+                       logger.log(Level.WARNING, "An error occured retrieving 
the last retrieved offset", e.getCause());
+               }
+
+               return info;
+       }
+
+       protected Properties getKafkaConsumerProps(List<String> kafkaBrokers) {
+               Properties kafkaConsumerProps = new Properties();
+               kafkaConsumerProps.put("group.id", "OffsetRetrieverConsumer" + 
UUID.randomUUID().toString());
+               
kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+               
kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               
kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+               StringBuilder brokers = new StringBuilder();
+               final Iterator<String> iterator = kafkaBrokers.iterator();
+               while (iterator.hasNext()) {
+                       brokers.append(iterator.next());
+                       if (iterator.hasNext()) {
+                               brokers.append(",");
+                       }
+               }
+               kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokers.toString());
+               
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+               return kafkaConsumerProps;
+       }
+
+       public List<KafkaBrokerPartitionMessageCountInfo> 
getMessageCountForTopic(String zookeeperHost, String topic) {
+               logger.entering(this.getClass().getName(), 
"getMessageCountForTopic");
+               List<KafkaBrokerPartitionMessageCountInfo> result = new 
ArrayList<KafkaBrokerPartitionMessageCountInfo>();
+
+               List<Integer> partitions = 
getPartitionIdsForTopic(zookeeperHost, topic);
+
+               List<String> kafkaBrokers = getBrokers(zookeeperHost);
+               for (int cnt = 0; cnt < kafkaBrokers.size(); cnt++) {
+                       String broker = kafkaBrokers.get(cnt);
+                       logger.info("getMessageCountForTopic from broker: " + 
broker);
+                       KafkaBrokerPartitionMessageCountInfo container = new 
KafkaBrokerPartitionMessageCountInfo();
+                       container.setBroker(broker);
+
+                       String[] splitBroker = broker.split(":");
+                       String host = splitBroker[0];
+                       String port = splitBroker[1];
+                       SimpleConsumer consumer = new SimpleConsumer(host, 
Integer.valueOf(port), 100000, 64 * 1024, "leaderLookup");
+                       Map<Integer, Long> partitionCountMap = new 
HashMap<Integer, Long>();
+
+                       for (Integer partition : partitions) {
+                               logger.info("broker: " + broker + ", partition 
" + partition);
+                               partitionCountMap.put(partition, null);
+                               FetchRequest req = new 
FetchRequestBuilder().clientId(CLIENT_ID).addFetch(topic, partition, 0, 
100000).build();
+                               FetchResponse fetchResponse = 
consumer.fetch(req);
+
+                               if (fetchResponse.hasError()) {
+                                       //in case of a broker error, do 
nothing. The broker has no information about the partition so we continue with 
the next one.
+                                       if (fetchResponse.errorCode(topic, 
partition) == ErrorMapping.NotLeaderForPartitionCode()) {
+                                               logger.info("broker " + broker 
+ " is not leader for partition " + partition + ", cannot retrieve 
MessageCountForTopic");
+                                       } else {
+                                               logger.warning("broker: " + 
broker + ", partition " + partition + " has error: " + 
fetchResponse.errorCode(topic, partition));
+                                       }
+                                       continue;
+                               }
+
+                               long numRead = 0;
+                               long readOffset = numRead;
+
+                               for (MessageAndOffset messageAndOffset : 
fetchResponse.messageSet(topic, partition)) {
+                                       long currentOffset = 
messageAndOffset.offset();
+                                       if (currentOffset < readOffset) {
+                                               logger.info("Found an old 
offset: " + currentOffset + " Expecting: " + readOffset);
+                                               continue;
+                                       }
+                                       readOffset = 
messageAndOffset.nextOffset();
+                                       numRead++;
+                               }
+
+                               logger.info("broker: " + broker + ", partition 
" + partition + " total messages: " + numRead);
+                               partitionCountMap.put(partition, numRead);
+                       }
+                       consumer.close();
+                       container.setPartitionMsgCountMap(partitionCountMap);
+                       result.add(container);
+               }
+
+               return result;
+       }
+
+       /**
+        * @param group
+        * @param topic
+        * @return a list of partitions and their offsets. If no offset is 
found, it is returned as -1
+        */
+       public List<PartitionOffsetInfo> getOffsetsForTopic(String 
zookeeperHost, String group, String topic) {
+               BlockingChannel channel = 
getOffsetManagerChannel(zookeeperHost, group);
+
+               List<Integer> partitionIds = 
getPartitionIdsForTopic(zookeeperHost, topic);
+               List<TopicAndPartition> partitions = new 
ArrayList<TopicAndPartition>();
+               int correlationId = 0;
+               for (Integer id : partitionIds) {
+                       TopicAndPartition testPartition0 = new 
TopicAndPartition(topic, id);
+                       partitions.add(testPartition0);
+               }
+
+               OffsetFetchRequest fetchRequest = new OffsetFetchRequest(group, 
partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, 
version 0 fetches from ZooKeeper
+                               correlationId++, CLIENT_ID);
+
+               List<PartitionOffsetInfo> offsetResult = new 
ArrayList<PartitionOffsetInfo>();
+               int retryCount = 0;
+               //it is possible that a ConsumerCoordinator is not available 
yet, if this is the case we need to wait and try again.
+               boolean done = false;
+               while (retryCount < 5 && !done) {
+                       offsetResult = new ArrayList<PartitionOffsetInfo>();
+                       retryCount++;
+                       channel.send(fetchRequest.underlying());
+                       OffsetFetchResponse fetchResponse = 
OffsetFetchResponse.readFrom(channel.receive().payload());
+
+                       boolean errorFound = false;
+                       for (TopicAndPartition part : partitions) {
+                               if (part.topic().equals(topic)) {
+                                       PartitionOffsetInfo offsetInfo = new 
PartitionOffsetInfo();
+                                       
offsetInfo.setPartitionId(part.partition());
+                                       OffsetMetadataAndError result = 
fetchResponse.offsets().get(part);
+                                       short offsetFetchErrorCode = 
result.error();
+                                       if (offsetFetchErrorCode == 
ErrorMapping.NotCoordinatorForConsumerCode()) {
+                                               channel.disconnect();
+                                               String msg = "Offset could not 
be fetched, the used broker is not the coordinator for this consumer";
+                                               offsetInfo.setMessage(msg);
+                                               logger.warning(msg);
+                                               errorFound = true;
+                                               break;
+                                       } else if (offsetFetchErrorCode == 
ErrorMapping.OffsetsLoadInProgressCode()) {
+                                               logger.warning("Offset could 
not be fetched at this point, the offsets are not available yet");
+                                               try {
+                                                       Thread.sleep(2000);
+                                               } catch (InterruptedException 
e) {
+                                                       e.printStackTrace();
+                                               }
+                                               //Offsets are not available 
yet. Wait and try again
+                                               errorFound = true;
+                                               break;
+                                       } else if (result.error() != 
ErrorMapping.NoError()) {
+                                               String msg = 
MessageFormat.format("Offset could not be fetched at this point, an unknown 
error occured ( {0} )", result.error());
+                                               offsetInfo.setMessage(msg);
+                                               logger.warning(msg);
+                                       } else {
+                                               long offset = result.offset();
+                                               offsetInfo.setOffset(offset);
+                                       }
+
+                                       offsetResult.add(offsetInfo);
+                               }
+                       }
+                       if (!errorFound) {
+                               done = true;
+                       }
+               }
+
+               if (channel.isConnected()) {
+                       channel.disconnect();
+               }
+               return offsetResult;
+       }
+
+       public List<TopicMetadata> getMetadataForTopic(String zookeeperHost, 
String kafkaTopic) {
+               //connecting to a single broker should be enough because every 
single broker knows everything we need
+               for (String brokerHost : getBrokers(zookeeperHost)) {
+                       brokerHost = brokerHost.replace("PLAINTEXT://", "");
+                       String[] splitBroker = brokerHost.split(":");
+                       String ip = splitBroker[0];
+                       String port = splitBroker[1];
+
+                       //it is possible that a ConsumerCoordinator is not 
available yet, if this is the case we need to wait and try again.
+                       SimpleConsumer consumer = null;
+                       try {
+                               consumer = new SimpleConsumer(ip, 
Integer.valueOf(port), 100000, 64 * 1024, "leaderLookup");
+                               int retryCount = 0;
+                               boolean done = false;
+                               while (retryCount < 5 && !done) {
+                                       retryCount++;
+
+                                       List<String> topics = 
Collections.singletonList(kafkaTopic);
+                                       TopicMetadataRequest req = new 
TopicMetadataRequest(topics);
+                                       kafka.javaapi.TopicMetadataResponse 
resp = consumer.send(req);
+                                       List<TopicMetadata> metaData = 
resp.topicsMetadata();
+
+                                       boolean errorFound = false;
+                                       for (TopicMetadata item : metaData) {
+                                               if 
(item.topic().equals(kafkaTopic)) {
+                                                       if (item.errorCode() == 
ErrorMapping.LeaderNotAvailableCode()) {
+                                                               //wait and try 
again
+                                                               errorFound = 
true;
+                                                               try {
+                                                                       
Thread.sleep(2000);
+                                                               } catch 
(InterruptedException e) {
+                                                                       
e.printStackTrace();
+                                                               }
+                                                               break;
+                                                       }
+                                                       return metaData;
+                                               }
+                                       }
+
+                                       if (!errorFound) {
+                                               done = true;
+                                       }
+                               }
+                       } finally {
+                               if (consumer != null) {
+                                       consumer.close();
+                               }
+                       }
+               }
+               return null;
+       }
+
+       public List<Integer> getPartitionsForTopic(String zookeeperHost, String 
topic) {
+               ZkClient zkClient = new ZkClient(zookeeperHost, 5000, 5000, 
ZKStringSerializer$.MODULE$);
+               Map<String, Seq<Object>> partitions = JavaConversions
+                               .mapAsJavaMap(new ZkUtils(zkClient, new 
ZkConnection(zookeeperHost), 
false).getPartitionsForTopics(JavaConversions.asScalaBuffer(Arrays.asList(topic)).toList()));
+               List<Object> partitionObjList = 
JavaConversions.seqAsJavaList(partitions.entrySet().iterator().next().getValue());
+               List<Integer> partitionsList = new ArrayList<Integer>();
+               for (Object partObj : partitionObjList) {
+                       partitionsList.add((Integer) partObj);
+               }
+               zkClient.close();
+               return partitionsList;
+       }
+
+       public List<KafkaPartitionInfo> getPartitionInfoForTopic(String 
zookeeperHost, String topic) {
+               List<TopicMetadata> topicInfos = 
getMetadataForTopic(zookeeperHost, topic);
+               List<KafkaPartitionInfo> partitionInfoList = new 
ArrayList<KafkaPartitionInfo>();
+               for (TopicMetadata topicInfo : topicInfos) {
+                       for (PartitionMetadata part : 
topicInfo.partitionsMetadata()) {
+                               KafkaPartitionInfo info = new 
KafkaPartitionInfo();
+                               info.setPartitionId(part.partitionId());
+
+                               List<BrokerNode> partitionNodes = new 
ArrayList<BrokerNode>();
+                               for (BrokerEndPoint brokerPoint : part.isr()) {
+                                       BrokerNode node = new BrokerNode();
+                                       
node.setHost(brokerPoint.connectionString());
+                                       
node.setLeader(brokerPoint.connectionString().equals(part.leader().connectionString()));
+                                       partitionNodes.add(node);
+                               }
+                               info.setNodes(partitionNodes);
+                               partitionInfoList.add(info);
+                       }
+               }
+               //partitionInformation is collected, end loop and return
+               return partitionInfoList;
+       }
+
+       public List<Integer> getPartitionIdsForTopic(String zookeeperHost, 
String topic) {
+               List<TopicMetadata> metadata = 
getMetadataForTopic(zookeeperHost, topic);
+
+               List<Integer> partitionsList = new ArrayList<Integer>();
+               if (metadata != null && metadata.size() > 0) {
+                       for (PartitionMetadata partData : 
metadata.get(0).partitionsMetadata()) {
+                               partitionsList.add(partData.partitionId());
+                       }
+               }
+
+               return partitionsList;
+       }
+
+       private BlockingChannel getOffsetManagerChannel(String zookeeperHost, 
String group) {
+               int correlationId = 0;
+               for (String broker : getBrokers(zookeeperHost)) {
+                       String[] splitBroker = broker.split(":");
+                       String ip = splitBroker[0];
+                       String port = splitBroker[1];
+
+                       int retryCount = 0;
+                       //it is possible that a ConsumerCoordinator is not 
available yet, if this is the case we need to wait and try again.
+                       while (retryCount < 5) {
+                               retryCount++;
+
+                               BlockingChannel channel = new 
BlockingChannel(ip, Integer.valueOf(port), 
BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(),
+                                               5000 /* read timeout in millis 
*/);
+                               channel.connect();
+                               channel.send(new GroupCoordinatorRequest(group, 
OffsetRequest.CurrentVersion(), correlationId++, CLIENT_ID));
+                               GroupCoordinatorResponse metadataResponse = 
GroupCoordinatorResponse.readFrom(channel.receive().payload());
+
+                               if (metadataResponse.errorCode() == 
ErrorMapping.NoError()) {
+                                       BrokerEndPoint endPoint = 
metadataResponse.coordinatorOpt().get();
+                                       if (!endPoint.host().equals(ip) && 
!port.equals(endPoint.port())) {
+                                               channel.disconnect();
+                                               channel = new 
BlockingChannel(endPoint.host(), endPoint.port(), 
BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 
5000);
+                                               channel.connect();
+                                       }
+                                       return channel;
+                               } else if (metadataResponse.errorCode() == 
ErrorMapping.ConsumerCoordinatorNotAvailableCode()
+                                               || metadataResponse.errorCode() 
== ErrorMapping.OffsetsLoadInProgressCode()) {
+                                       //wait and try again
+                                       try {
+                                               Thread.sleep(2000);
+                                       } catch (InterruptedException e) {
+                                               e.printStackTrace();
+                                       }
+                               } else {
+                                       //unknown error, continue with next 
broker
+                                       break;
+                               }
+                       }
+               }
+               throw new RuntimeException("Kafka Consumer Broker not 
available!");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
new file mode 100755
index 0000000..33c4ae0
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.TimeoutException;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.MessageEncryption;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+
+public class KafkaProducerManager {
+
+       private final static Logger logger = 
Logger.getLogger(KafkaProducerManager.class.getName());
+       private static KafkaProducer<String, String> producer;
+
+       protected Properties getKafkaProducerConfig() {
+               SettingsManager odfConfig = new 
ODFFactory().create().getSettingsManager();
+               ODFInternalFactory f = new ODFInternalFactory();
+               Properties props = odfConfig.getKafkaProducerProperties();
+               String zookeeperConnect = 
f.create(Environment.class).getZookeeperConnectString();
+               final Iterator<String> brokers = 
f.create(KafkaMonitor.class).getBrokers(zookeeperConnect).iterator();
+               StringBuilder brokersString = new StringBuilder();
+               while (brokers.hasNext()) {
+                       brokersString.append(brokers.next());
+                       if (brokers.hasNext()) {
+                               brokersString.append(",");
+                       }
+               }
+               logger.info("Sending messages to brokers: " + 
brokersString.toString());
+               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokersString.toString());
+               props.put(ProducerConfig.CLIENT_ID_CONFIG, 
"ODF_MESSAGE_PRODUCER");
+               return props;
+       }
+
+       private KafkaProducer<String, String> getProducer() {
+               if (producer == null) {
+                       producer = new KafkaProducer<String, 
String>(getKafkaProducerConfig());
+               }
+               return producer;
+       }
+
+       public void sendMsg(String topicName, String key, String value) {
+               MessageEncryption msgEncryption = new 
ODFInternalFactory().create(MessageEncryption.class);
+               value = msgEncryption.encrypt(value);
+               sendMsg(topicName, key, value, null);
+       }
+
+       public void sendMsg(final String topicName, final String key, final 
String value, final Callback callback) {
+               ProducerRecord<String, String> producerRecord = new 
ProducerRecord<String, String>(topicName, key, value);
+               try {
+                       int retryCount = 0;
+                       boolean msgSend = false;
+                       while (retryCount < 5 && !msgSend) {
+                               try {
+                                       getProducer().send(producerRecord, 
callback).get(4000, TimeUnit.MILLISECONDS);
+                                       msgSend = true;
+                               } catch (ExecutionException ex) {
+                                       if (ex.getCause() instanceof 
TimeoutException) {
+                                               logger.warning("Message could 
not be send within 4000 ms");
+                                               retryCount++;
+                                       } else {
+                                               throw ex;
+                                       }
+
+                               }
+                       }
+                       if (retryCount == 5) {
+                               logger.warning("Message could not be send 
within 5 retries!");
+                               logger.fine("topic: " + topicName + " key " + 
key + " msg " + value);
+                       }
+               } catch (Exception exc) {
+                       logger.log(Level.WARNING, "Exception while sending 
message", exc);
+                       if (producer != null) {
+                               producer.close();
+                       }
+                       producer = null;
+                       throw new RuntimeException(exc);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
new file mode 100755
index 0000000..d0cf704
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.messaging.MessageEncryption;
+
+import kafka.consumer.ConsumerTimeoutException;
+
+public class KafkaQueueConsumer implements ODFRunnable {
+       private Logger logger = 
Logger.getLogger(KafkaQueueConsumer.class.getName());
+       final static int POLLING_DURATION_MS = 100;
+       public static final int MAX_PROCESSING_EXCEPTIONS = 3;
+       public final static int MAX_CONSUMPTION_EXCEPTIONS = 5;
+       
+       public static interface ConsumptionCallback {
+               boolean stopConsumption();
+       }
+
+       private boolean ready = false;
+
+       private String topic;
+       private KafkaConsumer<String, String> kafkaConsumer;
+       private Properties config;
+       private boolean isShutdown = false;
+       private ExecutorService executorService;
+       private QueueMessageProcessor requestConsumer;
+       private int consumptionExceptionCount = 0;
+       private ConsumptionCallback consumptionCallback;
+
+       public KafkaQueueConsumer(String topicName, Properties config, 
QueueMessageProcessor requestConsumer) {
+               this(topicName, config, requestConsumer, null);
+       }
+       
+       public KafkaQueueConsumer(String topicName, Properties config, 
QueueMessageProcessor requestConsumer, ConsumptionCallback consumptionCallback) 
{
+               this.topic = topicName;
+               this.config = config;
+               this.requestConsumer = requestConsumer;
+               this.consumptionCallback = consumptionCallback;
+               if (this.consumptionCallback == null) {
+                       this.consumptionCallback = new ConsumptionCallback() {
+
+                               @Override
+                               public boolean stopConsumption() {
+                                       // default: never stop
+                                       return false;
+                               }
+                               
+                       };
+               }
+       }
+
+       public void run() {
+               final String groupId = this.config.getProperty("group.id");
+               while (consumptionExceptionCount < MAX_CONSUMPTION_EXCEPTIONS 
&& !isShutdown) {
+                       try {
+                               logger.info("Starting consumption for " + 
groupId);
+                               startConsumption();
+                       } catch (RuntimeException ex) {
+                               if (ex.getCause() instanceof WakeupException) {
+                                       isShutdown = true;
+                               } else {
+                                       consumptionExceptionCount++;
+                                       logger.log(Level.WARNING, "Caught 
exception in KafkaQueueConsumer " + groupId + ", restarting consumption!", ex);
+                               }
+                               if (this.kafkaConsumer != null) {
+                                       this.kafkaConsumer.close();
+                                       this.kafkaConsumer = null;
+                               }
+                       } catch (Exception e) {
+                               consumptionExceptionCount++;
+                               logger.log(Level.WARNING, "Caught exception in 
KafkaQueueConsumer " + groupId + ", restarting consumption!", e);
+                               if (this.kafkaConsumer != null) {
+                                       this.kafkaConsumer.close();
+                                       this.kafkaConsumer = null;
+                               }
+                       }
+               }
+               logger.info("Enough consumption for " + groupId);
+               this.ready = false;
+               this.cancel();
+       }
+
+       private void startConsumption() {
+               if (this.consumptionCallback.stopConsumption()) {
+                       return;
+               }
+               Exception caughtException = null;
+               final String logPrefix = this + " consumer: [" + 
this.requestConsumer.getClass().getSimpleName() + "], on " + topic + ": ";
+               try {
+                       if (this.kafkaConsumer == null) {
+                               logger.fine(logPrefix + " create new consumer 
for topic " + topic);
+                               try {
+                                       this.kafkaConsumer = new 
KafkaConsumer<String, String>(config);
+                                       
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+
+                                               @Override
+                                               public void 
onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                                                       logger.fine(logPrefix + 
" partitions revoked " + topic + " new partitions: " + partitions.size());
+                                               }
+
+                                               @Override
+                                               public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {                   
                            
+                                                       logger.finer(logPrefix 
+ " partitions assigned " + topic + " , new partitions: " + partitions.size());
+                                                       logger.info(logPrefix + 
"consumer is ready with " + partitions.size() + " partitions assigned");
+                                                       ready = true;
+                                               }
+                                       });
+                               } catch (ZkTimeoutException zkte) {
+                                       String zkHosts = 
config.getProperty("zookeeper.connect");
+                                       logger.log(Level.SEVERE, logPrefix + " 
Could not connect to the Zookeeper instance at ''{0}''. Please ensure that 
Zookeeper is running", zkHosts);
+                                       throw zkte;
+                               }
+                       }
+                       logger.log(Level.INFO, logPrefix + " Consumer " + 
"''{1}'' is now listening on ODF queue ''{0}'' with configuration {2}", new 
Object[] { topic, requestConsumer, config });
+                       MessageEncryption msgEncryption = new 
ODFInternalFactory().create(MessageEncryption.class);
+                       while (!Thread.interrupted() && !isShutdown && 
kafkaConsumer != null) {
+                               if (this.consumptionCallback.stopConsumption()) 
{
+                                       isShutdown = true;
+                                       break;
+                               }
+                               ConsumerRecords<String, String> records = 
kafkaConsumer.poll(POLLING_DURATION_MS);
+                               kafkaConsumer.commitSync(); // commit offset 
immediately to avoid timeouts for long running processors
+                               for (TopicPartition partition : 
kafkaConsumer.assignment()) {
+                                       List<ConsumerRecord<String, String>> 
polledRecords = records.records(partition);
+                                       //              
logger.log(Level.FINEST, logPrefix + "Polling finished got {0} results, 
continue processing? {1}", new Object[] { polledRecords.size(), 
continueProcessing });
+                                       if (!polledRecords.isEmpty()) {
+                                               
logger.fine(polledRecords.get(0).value() + " offset: " + 
polledRecords.get(0).offset());
+                                       }
+
+                                       for (int no = 0; no < 
polledRecords.size(); no++) {
+                                               ConsumerRecord<String, String> 
record = polledRecords.get(no);
+                                               String s = record.value();
+                                               logger.log(Level.FINEST, 
logPrefix + "Decrypting message {0}", s);
+                                               try {
+                                                       s = 
msgEncryption.decrypt(s);
+                                               } catch (Exception exc) {
+                                                       
logger.log(Level.WARNING, "Message could not be decrypted, ignoring it", exc);
+                                                       s = null;
+                                               }
+                                               if (s != null) {
+                                                       
logger.log(Level.FINEST, logPrefix + "Sending message to consumer ''{0}''", s);
+                                                       int exceptionCount = 0;
+                                                       boolean 
processedSuccessfully = false;
+                                                       while (exceptionCount < 
MAX_PROCESSING_EXCEPTIONS && !processedSuccessfully) {
+                                                               try {
+                                                                       
exceptionCount++;
+                                                                       
this.requestConsumer.process(executorService, s, record.partition(), 
record.offset());
+                                                                       
processedSuccessfully = true;
+                                                               } catch 
(Exception ex) {
+                                                                       
logger.warning("Exception " + exceptionCount + " caught processing message!");
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               } catch (ConsumerTimeoutException e) {
+                       String msg = MessageFormat.format(" Caught timeout on 
queue ''{0}''", topic);
+                       logger.log(Level.WARNING, logPrefix + msg, e);
+                       caughtException = e;
+               } catch (Exception exc) {
+                       String msg = MessageFormat.format(" Caught exception on 
queue ''{0}''", topic);
+                       logger.log(Level.WARNING, logPrefix + msg, exc);
+                       caughtException = exc;
+               } finally {
+                       if (kafkaConsumer != null) {
+                               logger.log(Level.FINE, logPrefix + "Closing 
consumer " + " on topic ''{0}''", topic);
+                               kafkaConsumer.close();
+                               logger.log(Level.FINE, logPrefix + "Closed 
consumer " + " on topic ''{0}''", topic);
+                               kafkaConsumer = null;
+                       }
+               }
+               logger.log(Level.INFO, logPrefix + "Finished consumer on topic 
''{0}''", topic);
+               if (caughtException != null) {
+                       caughtException.printStackTrace();
+                       throw new RuntimeException(caughtException);
+               }
+       }
+
+       public void cancel() {
+               logger.log(Level.INFO, "Shutting down consumer on topic 
''{0}''", topic);
+               if (this.kafkaConsumer != null) {
+                       this.kafkaConsumer.wakeup();
+               }
+               isShutdown = true;
+       }
+
+       public boolean isShutdown() {
+               return isShutdown;
+       }
+
+       @Override
+       public void setExecutorService(ExecutorService service) {
+               this.executorService = service;
+       }
+
+       @Override
+       public boolean isReady() {
+               return ready;
+       }
+
+}

Reply via email to