http://git-wip-us.apache.org/repos/asf/apex-site/blob/357a5a07/docs/malhar-3.6/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.6/mkdocs/search_index.json 
b/docs/malhar-3.6/mkdocs/search_index.json
new file mode 100644
index 0000000..9248536
--- /dev/null
+++ b/docs/malhar-3.6/mkdocs/search_index.json
@@ -0,0 +1,1429 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open 
source operator and codec library that can be used with the \nApache Apex\n 
platform to build real-time streaming applications.  Enabling users to extract 
value quickly, Malhar operators help get data in, analyze it in real-time, and 
get data out of Hadoop.  In addition to the operators, the library contains a 
number of demos applications, demonstrating operator features and 
capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor 
most streaming platforms, connectors are afterthoughts and often end up being 
simple \u2018bolt-ons\u2019 to the platform. As a result they often cause 
performance issues or data loss when put through failure scenarios and 
scalability requirements. Malhar operators do not face these issues as they 
were designed to be integral parts of Apex. Hence, they have following core 
streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar 
operators where appli
 cable have fault tolerance built in. They use the checkpoint capability 
provided by the framework to ensure that there is no data loss under ANY 
failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where 
applicable provide out of the box support for ALL three processing guarantees 
\u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user 
to write any additional code.  Some operators, like MQTT operator, deal with 
source systems that can not track processed data and hence need the operators 
to keep track of the data.  Malhar has support for a generic operator that uses 
alternate storage like HDFS to facilitate this.  Finally for databases that 
support transactions or support any sort of atomic batch operations Malhar 
operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n 
\u2013 Based on changing business conditions you often have to tweak several 
parameters used by the operators in your streaming application without incur
 ring any application downtime. You can also change properties of a Malhar 
operator at runtime without having to bring down the application.\n\n\nEase of 
extensibility\n \u2013 Malhar operators are based on templates that are easy to 
extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input 
data stream often needs to be partitioned based on the contents of the stream. 
Also for operators that ingest data from external systems partitioning needs to 
be done based on the capabilities of the external system.  For example with 
Kafka, the operator can automatically scale up or down based on the changes in 
the number of Kafka partitions.\n\n\n\n\nOperator Library 
Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various 
sub categories of input and output operators. Input operators also have a 
corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming 
analytics use cases require the data to be stored in HDFS or perhaps S3 if the 
applica
 tion is running in AWS.  Users often need to re-run their streaming analytical 
applications against historical data or consume data from upstream processes 
that are perhaps writing to some NFS share.  Apex supports input \n output 
operators for HDFS, S3, NFS \n Local Files.  There are also File Splitter and 
Block Reader operators, which can accelecate processing of large files by 
splitting and paralellizing the work across non-overlapping sets of file 
blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use cases 
require some reference data lookups to enrich, tag or filter streaming data. 
There is also a need to save results of the streaming analytical computation to 
a database so an operational dashboard can see them. Apex supports a JDBC 
operator so you can read/write data from any JDBC compliant RDBMS like Oracle, 
MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL key-value pair 
databases like Cassandra \n HBase are a common part of streaming analytics 
applicati
 on architectures to lookup reference data or store results.  Malhar has 
operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and 
CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are 
the workhorses of messaging infrastructure in most enterprises.  Malhar has a 
robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, 
and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an 
operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n 
Caching platforms\n - Some streaming use cases need instantaneous access to 
shared state across the application. Caching platforms and in-memory databases 
serve this purpose really well. To support these use cases, Malhar has 
operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an 
operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - 
Malhar provides connectors that can communicate in HTTP, RSS, Socket, 
WebSocket, FTP, and MQT
 T.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data 
formats that a streaming application developer might need to parse. Often there 
are existing parsers available for these that can be directly plugged into an 
Apache Apex application. For example in the Telco space, a Java based CDR 
parser can be directly plugged into Apache Apex operator. To further simplify 
development experience, Malhar also provides some operators for parsing common 
formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, 
syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs 
processing to clean, filter, tag, summarize, etc. The goal of Malhar is to 
enable the application developer to focus on WHAT needs to be done to the 
stream to get it in the right format and not worry about the HOW.  Malhar has 
several operators to perform the common stream manipulation actions like \u2013 
GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Out
 er join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important 
promises of a streaming analytics platform like Apache Apex is the ability to 
do analytics in real-time. However delivering on the promise becomes really 
difficult when the platform does not provide out of the box operators to 
support variety of common compute functions as the user then has to worry about 
making these scalable, fault tolerant, stateful, etc.  Malhar takes this 
responsibility away from the application developer by providing a variety of 
out of the box computational operators.\n\n\nBelow is just a snapshot of the 
compute operators available in Malhar\n\n\n\n\nStatistics and math - Various 
mathematical and statistical computations over application defined time 
windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, 
TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages 
Support\n\n\nMigrating to a new platform often requires re-use of the existing 
code that would be diffic
 ult or time-consuming to re-write.  With this in mind, Malhar supports 
invocation of code written in other languages by wrapping them in one of the 
library operators, and allows execution of software written 
in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#apache-apex-malhar", 
+            "text": "Apache Apex Malhar is an open source operator and codec 
library that can be used with the  Apache Apex  platform to build real-time 
streaming applications.  Enabling users to extract value quickly, Malhar 
operators help get data in, analyze it in real-time, and get data out of 
Hadoop.  In addition to the operators, the library contains a number of demos 
applications, demonstrating operator features and capabilities.", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#capabilities-common-across-malhar-operators", 
+            "text": "For most streaming platforms, connectors are 
afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the 
platform. As a result they often cause performance issues or data loss when put 
through failure scenarios and scalability requirements. Malhar operators do not 
face these issues as they were designed to be integral parts of Apex. Hence, 
they have following core streaming runtime capabilities   Fault tolerance  
\u2013 Malhar operators where applicable have fault tolerance built in. They 
use the checkpoint capability provided by the framework to ensure that there is 
no data loss under ANY failure scenario.  Processing guarantees  \u2013 Malhar 
operators where applicable provide out of the box support for ALL three 
processing guarantees \u2013 exactly once, at-least once, and at-most once 
WITHOUT requiring the user to write any additional code.  Some operators, like 
MQTT operator, deal with source systems that can not track processed data and 
hence n
 eed the operators to keep track of the data.  Malhar has support for a generic 
operator that uses alternate storage like HDFS to facilitate this.  Finally for 
databases that support transactions or support any sort of atomic batch 
operations Malhar operators can do exactly once down to the tuple level.  
Dynamic updates  \u2013 Based on changing business conditions you often have to 
tweak several parameters used by the operators in your streaming application 
without incurring any application downtime. You can also change properties of a 
Malhar operator at runtime without having to bring down the application.  Ease 
of extensibility  \u2013 Malhar operators are based on templates that are easy 
to extend.  Partitioning support  \u2013 In streaming applications the input 
data stream often needs to be partitioned based on the contents of the stream. 
Also for operators that ingest data from external systems partitioning needs to 
be done based on the capabilities of the external system.  Fo
 r example with Kafka, the operator can automatically scale up or down based on 
the changes in the number of Kafka partitions.", 
+            "title": "Capabilities common across Malhar operators"
+        }, 
+        {
+            "location": "/#operator-library-overview", 
+            "text": "", 
+            "title": "Operator Library Overview"
+        }, 
+        {
+            "location": "/#inputoutput-connectors", 
+            "text": "Below is a summary of the various sub categories of input 
and output operators. Input operators also have a corresponding output operator 
  File Systems  \u2013 Most streaming analytics use cases require the data to 
be stored in HDFS or perhaps S3 if the application is running in AWS.  Users 
often need to re-run their streaming analytical applications against historical 
data or consume data from upstream processes that are perhaps writing to some 
NFS share.  Apex supports input   output operators for HDFS, S3, NFS   Local 
Files.  There are also File Splitter and Block Reader operators, which can 
accelecate processing of large files by splitting and paralellizing the work 
across non-overlapping sets of file blocks.  Relational Databases  \u2013 Most 
stream processing use cases require some reference data lookups to enrich, tag 
or filter streaming data. There is also a need to save results of the streaming 
analytical computation to a database so an operational das
 hboard can see them. Apex supports a JDBC operator so you can read/write data 
from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.  NoSQL Databases 
 \u2013 NoSQL key-value pair databases like Cassandra   HBase are a common part 
of streaming analytics application architectures to lookup reference data or 
store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, 
MongoDB, and CouchDB.  Messaging Systems  \u2013 Kafka, JMS, and similar 
systems are the workhorses of messaging infrastructure in most enterprises.  
Malhar has a robust, industry-tested set of operators to read and write Kafka, 
JMS, ZeroMQ, and RabbitMQ messages.  Notification Systems  \u2013 Malhar 
includes an operator for sending notifications via SMTP.  In-memory Databases   
Caching platforms  - Some streaming use cases need instantaneous access to 
shared state across the application. Caching platforms and in-memory databases 
serve this purpose really well. To support these use cases, Malhar
  has operators for memcached and Redis.  Social Media  - Malhar includes an 
operator to connect to the popular Twitter stream fire hose.  Protocols  - 
Malhar provides connectors that can communicate in HTTP, RSS, Socket, 
WebSocket, FTP, and MQTT.", 
+            "title": "Input/output connectors"
+        }, 
+        {
+            "location": "/#parsers", 
+            "text": "There are many industry vertical specific data formats 
that a streaming application developer might need to parse. Often there are 
existing parsers available for these that can be directly plugged into an 
Apache Apex application. For example in the Telco space, a Java based CDR 
parser can be directly plugged into Apache Apex operator. To further simplify 
development experience, Malhar also provides some operators for parsing common 
formats like XML (DOM   SAX), JSON (flat map converter), Apache log files, 
syslog, etc.", 
+            "title": "Parsers"
+        }, 
+        {
+            "location": "/#stream-manipulation", 
+            "text": "Streaming data inevitably needs processing to clean, 
filter, tag, summarize, etc. The goal of Malhar is to enable the application 
developer to focus on WHAT needs to be done to the stream to get it in the 
right format and not worry about the HOW.  Malhar has several operators to 
perform the common stream manipulation actions like \u2013 GroupBy, Join, 
Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, 
Update etc.", 
+            "title": "Stream manipulation"
+        }, 
+        {
+            "location": "/#compute", 
+            "text": "One of the most important promises of a streaming 
analytics platform like Apache Apex is the ability to do analytics in 
real-time. However delivering on the promise becomes really difficult when the 
platform does not provide out of the box operators to support variety of common 
compute functions as the user then has to worry about making these scalable, 
fault tolerant, stateful, etc.  Malhar takes this responsibility away from the 
application developer by providing a variety of out of the box computational 
operators.  Below is just a snapshot of the compute operators available in 
Malhar   Statistics and math - Various mathematical and statistical 
computations over application defined time windows.  Filtering and pattern 
matching  Sorting, maps, frequency, TopN, BottomN  Random data generators", 
+            "title": "Compute"
+        }, 
+        {
+            "location": "/#languages-support", 
+            "text": "Migrating to a new platform often requires re-use of the 
existing code that would be difficult or time-consuming to re-write.  With this 
in mind, Malhar supports invocation of code written in other languages by 
wrapping them in one of the library operators, and allows execution of software 
written in:   JavaScript  Python  R  Ruby", 
+            "title": "Languages Support"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/", 
+            "text": "KAFKA INPUT OPERATOR\n\n\nIntroduction\n\n\nApache 
Kafka\n is a pull-based and distributed publish subscribe messaging 
system,\ntopics are partitioned and replicated across nodes. \n\n\nThe Kafka 
input operator consumes data from the partitions of a Kafka topic for 
processing in Apex. \nThe operator has the ability to automatically scale with 
the Kafka partitioning for high throughput. \nIt is fault-tolerant (consumer 
offset checkpointing) and guarantees idempotency to allow exactly-once results 
in the downstream pipeline.\n\n\nFor more information about the operator design 
see this \npresentation\n\nand for processing guarantees this 
\nblog\n.\n\n\nThere are two separate implementations of the input 
operator,\none built against Kafka 0.8 client and a newer version for 
the\nKafka 0.9 consumer API that also works with MapR Streams.\nThese reside in 
different packages and are described separately below.\n\n\nKafka Input 
Operator for Kafka 0.8.x\n\n\nPackage: \ncom
 .datatorrent.contrib.kafka\n\n\nMaven artifact: 
\nmalhar-contrib\n\n\nAbstractKafkaInputOperator\n\n\nThis is the abstract 
implementation that serves as base class for consuming messages from Kafka 
messaging system. This class doesn\u2019t have any 
ports.\n\n\n\n\nConfiguration 
Parameters\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nDescription\n\n\n\n\n\n\nmaxTuplesPerWindow\n\n\nControls
 the maximum number of messages emitted in each streaming window from this 
operator. Minimum value is 1. Default value = MAX_VALUE 
\n\n\n\n\n\n\nidempotentStorageManager\n\n\nThis is an instance of 
IdempotentStorageManager. Idempotency ensures that the operator will process 
the same set of messages in a window before and after a failure. For example, 
let's say the operator completed window 10 and failed somewhere between window 
11. If the operator gets restored at window 10 then it will process the same 
messages again in window 10 which it did in the previous run before the 
failure. Idempotency is impor
 tant but comes with higher cost because at the end of each window the operator 
needs to persist some state with respect to that window. Default Value = 
com.datatorrent.lib.io.IdempotentStorageManager.\nNoopIdempotentStorageManager\n\n\n\n\n\n\nstrategy\n\n\nOperator
 supports two types of partitioning strategies, ONE_TO_ONE and 
ONE_TO_MANY.\n\n\nONE_TO_ONE: If this is enabled, the AppMaster creates one 
input operator instance per Kafka topic partition. So the number of Kafka topic 
partitions equals the number of operator instances.\n\n\nONE_TO_MANY: The 
AppMaster creates K = min(initialPartitionCount, N) Kafka input operator 
instances where N is the number of Kafka topic partitions. If K is less than N, 
the remaining topic partitions are assigned to the K operator instances in 
round-robin fashion. If K is less than initialPartitionCount, the AppMaster 
creates one input operator instance per Kafka topic partition. For example, if 
initialPartitionCount = 5 and number of Kafka partition
 s(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault 
Value = ONE_TO_ONE\n\n\n\n\n\n\nmsgRateUpperBound\n\n\nMaximum messages upper 
bound. Operator repartitions when the \nmsgProcessedPS\n exceeds this bound. 
\nmsgProcessedPS\n is the average number of messages processed per second by 
this operator.\n\n\n\n\n\n\nbyteRateUpperBound\n\n\nMaximum bytes upper bound. 
Operator repartitions when the \nbytesPS\n exceeds this bound. \nbytesPS\n is 
the average number of bytes processed per second by this 
operator.\n\n\n\n\n\n\n\n\noffsetManager\n\n\nThis is an optional parameter 
that is useful when the application restarts or start at specific offsets 
(offsets are explained below)\n\n\n\n\n\n\nrepartitionInterval\n\n\nInterval 
specified in milliseconds. This value specifies the minimum time required 
between two repartition actions. Default Value = 30 
Seconds\n\n\n\n\n\n\nrepartitionCheckInterval\n\n\nInterval specified in 
milliseconds. This value specifies the minimum int
 erval between two offset updates. Default Value = 5 
Seconds\n\n\n\n\n\n\ninitialPartitionCount\n\n\nWhen the ONE_TO_MANY partition 
strategy is enabled, this value indicates the number of Kafka input operator 
instances. Default Value = 1\n\n\n\n\n\n\nconsumer\n\n\nThis is an instance of 
com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of 
SimpleKafkaConsumer.\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nvoid 
emitTuple(Message message)\n: Abstract method that emits tuples extracted from 
Kafka message.\n\n\nKafkaConsumer\n\n\nThis is an abstract implementation of 
Kafka consumer. It sends the fetch\nrequests to the leading brokers of Kafka 
partitions. For each request,\nit receives the set of messages and stores them 
into the buffer which is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which 
extends\nKafkaConsumer and serves the functionality of Simple Consumer API 
and\nHighLevelKafkaConsumer which extends KafkaConsumer and \u00a0serves 
the\nfunctionality of High Level Co
 nsumer API.\n\n\nPre-requisites\n\n\nThis operator uses the Kafka 0.8.2.1 
client consumer API\nand will work with 0.8.x and 0.7.x versions of Kafka 
broker.\n\n\nConfiguration 
Parameters\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nzookeeper\n\n\nString\n\n\n\n\nSpecifies
 the zookeeper quorum of Kafka clusters that you want to consume messages from. 
zookeeper \u00a0is a string in the form of 
hostname1:port1,hostname2:port2,hostname3:port3 \u00a0where 
hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of 
zookeeper server. \u00a0If the topic name is the same across the Kafka clusters 
and want to consume data from these clusters, then configure the zookeeper as 
follows: 
c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6\n\n\nwhere\n\n\nc1,c2,c3 
indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and 
p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in 
case of single cl
 uster\n\n\n\n\n\n\ncacheSize\n\n\nint\n\n\n1024\n\n\nMaximum of buffered 
messages hold in 
memory.\n\n\n\n\n\n\ntopic\n\n\nString\n\n\ndefault_topic\n\n\nIndicates the 
name of the 
topic.\n\n\n\n\n\n\ninitialOffset\n\n\nString\n\n\nlatest\n\n\nIndicates the 
type of offset i.e, \u201cearliest or latest\u201d. If initialOffset is 
\u201clatest\u201d, then the operator consumes messages from latest point of 
Kafka queue. If initialOffset is \u201cearliest\u201d, then the operator 
consumes messages starting from message queue. This can be overridden by 
OffsetManager.\n\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\n\n\nvoid 
commitOffset(): Commit the offsets at checkpoint.\n\n\nMap \nKafkaPartition, 
Long\n getCurrentOffsets(): Return the current\n    offset 
status.\n\n\nresetPartitionsAndOffset(Set \nKafkaPartition\n partitionIds,\n    
Map \nKafkaPartition, Long\n startOffset): Reset the partitions with\n    
parittionIds and offsets with startOffset.\n\n\n\n\nConfiguration 
Parameters\u00a0for Simpl
 
eKafkaConsumer\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nbufferSize\n\n\nint\n\n\n1
 MB\n\n\nSpecifies the maximum total size of messages for each fetch 
request.\n\n\n\n\n\n\nmetadataRefreshInterval\n\n\nint\n\n\n30 
Seconds\n\n\nInterval in between refresh the metadata change(broker change) in 
milliseconds. Enabling metadata refresh guarantees an automatic reconnect when 
a new broker is elected as the host. A value of -1 disables this 
feature.\n\n\n\n\n\n\nmetadataRefreshRetryLimit\n\n\nint\n\n\n-1\n\n\nSpecifies 
the maximum brokers' metadata refresh retry limit. -1 means unlimited 
retry.\n\n\n\n\n\n\n\n\n\nOffsetManager\n\n\nThis is an interface for offset 
management and is useful when consuming data\nfrom specified offsets. Updates 
the offsets for all the Kafka partitions\nperiodically. Below is the code 
snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\n\n\npublic interface 
OffsetManager\n{\n  public Map\nKafkaPartition,
  Long\n loadInitialOffsets();\n  public void 
updateOffsets(Map\nKafkaPartition, Long\n 
offsetsOfPartitions);\n}\n\n\n\n\nAbstract Methods\n\n\nMap \nKafkaPartition, 
Long\n loadInitialOffsets()\n: Specifies the initial offset for consuming 
messages; called at the activation 
stage.\n\n\nupdateOffsets(Map\nKafkaPartition, Long\n offsetsOfPartitions)\n: 
\u00a0This\nmethod is called at every repartitionCheckInterval to update 
offsets.\n\n\nPartitioning\n\n\nThe logical instance of the KafkaInputOperator 
acts as the Partitioner\nas well as a StatsListener. This is because 
the\nAbstractKafkaInputOperator implements both 
the\ncom.datatorrent.api.Partitioner and 
com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of 
definePartitions(...) and\nprocessStats(...) which makes it 
auto-scalable.\n\n\nResponse processStats(BatchedOperatorStats stats)\n\n\nThe 
application master invokes this method on the logical instance with\nthe stats 
(tuplesProcessedPS, bytesPS, etc.) of
  each partition.\nRe-partitioning happens based on whether any new Kafka 
partitions added for\nthe topic or bytesPS and msgPS cross their respective 
upper bounds.\n\n\nDefinePartitions\n\n\nBased on the repartitionRequired field 
of the Response object which is\nreturned by processStats(...) method, the 
application master invokes\ndefinePartitions(...) on the logical instance which 
is also the\npartitioner instance. Dynamic partition can be disabled by setting 
the\nparameter repartitionInterval value to a negative 
value.\n\n\nAbstractSinglePortKafkaInputOperator\n\n\nThis class extends 
AbstractKafkaInputOperator to emit messages through single output 
port.\n\n\nPorts\n\n\noutputPort \nT\n: Tuples extracted from Kafka messages 
are emitted through this port.\n\n\nAbstract Methods\n\n\nT getTuple(Message 
msg)\n: Converts the Kafka message to tuple.\n\n\nConcrete 
Classes\n\n\n\n\nKafkaSinglePortStringInputOperator: extends 
\nAbstractSinglePortKafkaInputOperator\n, extracts string from Ka
 fka message.\n\n\nKafkaSinglePortByteArrayInputOperator: extends 
\nAbstractSinglePortKafkaInputOperator\n, extracts byte array from Kafka 
message.\n\n\n\n\nApplication Example\n\n\nThis section builds an Apex 
application using Kafka input operator.\nBelow is the code 
snippet:\n\n\n@ApplicationAnnotation(name = \nKafkaApp\n)\npublic class 
ExampleKafkaApplication implements StreamingApplication\n{\n  @Override\n  
public void populateDAG(DAG dag, Configuration entries)\n  {\n    
KafkaSinglePortByteArrayInputOperator input =  
dag.addOperator(\nMessageReader\n, new 
KafkaSinglePortByteArrayInputOperator());\n    ConsoleOutputOperator output = 
dag.addOperator(\nOutput\n, new ConsoleOutputOperator());\n    
dag.addStream(\nMessageData\n, input.outputPort, output.input);\n  
}\n}\n\n\n\n\nBelow is the configuration for \u201ctest\u201d Kafka topic name 
and\n\u201clocalhost:2181\u201d is the zookeeper forum:\n\n\nproperty\n\n  
\nname\ndt.operator.MessageReader.prop.topic\n/name\n\n  \nvalue\nte
 st\n/value\n\n\n/property\n\n\n\nproperty\n\n  
\nname\ndt.operator.KafkaInputOperator.prop.zookeeper\n/nam\n\n  
\nvalue\nlocalhost:2181\n/value\n\n\n/property\n\n\n\n\n\nKafka Input Operator 
for Kafka 0.9.x\n\n\nPackage: \norg.apache.apex.malhar.kafka\n\n\nMaven 
Artifact: \nmalhar-kafka\n\n\nThis version uses the new 0.9 version of consumer 
API and works with Kafka broker version 0.9 and later.\nThe operator is 
fault-tolerant, scalable and supports input from multiple clusters and multiple 
topics in a single operator instance.\n\n\nPre-requisites\n\n\nThis operator 
requires version 0.9.0 or later of the Kafka Consumer 
API.\n\n\nAbstractKafkaInputOperator\n\n\nPorts\n\n\n\n\nThis abstract class 
doesn't have any ports.\n\n\nConfiguration 
properties\n\n\n\n\n\n\n\n\nclusters\n - String[]\n\n\n\n\nMandatory 
Parameter.\n\n\nSpecifies the Kafka clusters that you want to consume messages 
from. To configure multi-cluster support, you need to specify the clusters 
separated by \";\".\n\n\n\n\
 n\n\n\n\ntopics\n - String[]\n\n\n\n\nMandatory Parameter.\n\n\nSpecified the 
Kafka topics that you want to consume messages from. If you want multi-topic 
support, then specify the topics separated by 
\",\".\n\n\n\n\n\n\n\n\nstrategy\n - PartitionStrategy\n\n\n\n\n\n\nOperator 
supports two types of partitioning strategies, \nONE_TO_ONE\n and 
\nONE_TO_MANY\n.\n\n\nONE_TO_ONE\n: If this is enabled, the AppMaster creates 
one input operator instance per Kafka topic partition. So the number of Kafka 
topic partitions equals the number of operator instances.\n\nONE_TO_MANY\n: The 
AppMaster creates K = min(initialPartitionCount, N) Kafka input operator 
instances where N is the number of Kafka topic partitions. If K is less than N, 
the remaining topic partitions are assigned to the K operator instances in 
round-robin fashion. If K is less than initialPartitionCount, the AppMaster 
creates one input operator instance per Kafka topic partition. For example, if 
initialPartitionCount = 5 and numb
 er of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator 
instances.\nDefault Value = 
\nPartitionStrategy.ONE_TO_ONE\n.\n\n\n\n\n\n\n\n\n\n\ninitialPartitionCount\n 
- Integer\n\n\n\n\nWhen the ONE_TO_MANY partition strategy is enabled, this 
value indicates the number of Kafka input operator instances. \n    Default 
Value = 1.\n\n\n\n\n\n\n\n\nrepartitionInterval\n - Long\n\n\n\n\nInterval 
specified in milliseconds. This value specifies the minimum time required 
between two repartition actions. \n    Default Value = 30 
Seconds.\n\n\n\n\n\n\n\n\nrepartitionCheckInterval\n - Long\n\n\n\n\nInterval 
specified in milliseconds. This value specifies the minimum interval between 
two stat checks.\n    Default Value = 5 
Seconds.\n\n\n\n\n\n\n\n\nmaxTuplesPerWindow\n - Integer\n\n\n\n\nControls the 
maximum number of messages emitted in each streaming window from this operator. 
Minimum value is 1. \n    Default value = \nMAX_VALUE\n 
\n\n\n\n\n\n\n\n\ninitialOffset\n - InitialOf
 fset\n\n\n\n\nIndicates the type of offset i.e, \nEARLIEST\n or \nLATEST\n or 
\nAPPLICATION_OR_EARLIEST\n or \nAPPLICATION_OR_LATEST\n. \n    \nLATEST\n =\n 
Consume new messages from latest offset in the topic. \n    \nEARLIEST\n =\n 
Consume all messages available in the topic.\n    \nAPPLICATION_OR_EARLIEST\n 
=\n Consume messages from committed position from last run. If there is no 
committed offset, then start consuming from beginning.\n    
\nAPPLICATION_OR_LATEST\n =\n Consumes messages from committed position from 
last run. If a committed offset is unavailable, then start consuming from 
latest position.\n    Default value = 
\nInitialOffset.APPLICATION_OR_LATEST\n\n\n\n\n\n\n\n\nmetricsRefreshInterval\n 
- Long\n\n\n\n\nInterval specified in milliseconds. This value specifies the 
minimum interval between two metric stat updates.\n    Default value = 5 
Seconds.\n\n\n\n\n\n\n\n\nconsumerTimeout\n - Long\n\n\n\n\nIndicates the 
\ntime waiting in poll\n when data is not available.\n   
  Default value = 5 Seconds.\n\n\n\n\n\n\n\n\nholdingBufferSize\n - 
Long\n\n\n\n\nIndicates the maximum number of messages kept in memory for 
emitting.\n    Default value = 1024.\n\n\n\n\n\n\n\n\nconsumerProps\n - 
Properties\n\n\n\n\nSpecify the [consumer 
properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) 
which are not yet set to the operator.\n\n\n\n\n\n\n\n\nwindowDataManager\n - 
WindowDataManager\n\n\n\n\nIf set to a value other than the default, such as 
\nFSWindowDataManager\n, specifies that the operator will process the same set 
of messages in a window before and after a failure. This is important but it 
comes with higher cost because at the end of each window the operator needs to 
persist some state with respect to that window.\n    Default value = 
\nWindowDataManager.NoopWindowDataManager\n.\n\n\n\n\n\n\n\n\nAbstract 
Methods\n\n\nvoid emitTuple(String cluster, ConsumerRecord\nbyte[], byte[]\n 
message)\n: Abstract method that emits tuples\nextracted
  from Kafka message.\n\n\nConcrete 
Classes\n\n\nKafkaSinglePortInputOperator\n\n\nThis class extends from 
AbstractKafkaInputOperator and defines the \ngetTuple()\n method which extracts 
byte array from Kafka message.\n\n\nPorts\n\n\noutputPort \nbyte[]\n: Tuples 
extracted from Kafka messages are emitted through this port.\n\n\nApplication 
Example\n\n\nThis section builds an Apex application using Kafka input 
operator.\nBelow is the code snippet:\n\n\n@ApplicationAnnotation(name = 
\nKafkaApp\n)\npublic class ExampleKafkaApplication implements 
StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, 
Configuration entries)\n  {\n    KafkaSinglePortInputOperator input =  
dag.addOperator(\nMessageReader\n, new KafkaSinglePortInputOperator());\n    
ConsoleOutputOperator output = dag.addOperator(\nOutput\n, new 
ConsoleOutputOperator());\n    dag.addStream(\nMessageData\n, input.outputPort, 
output.input);\n  }\n}\n\n\n\n\nBelow is the configuration for \u201ctest\u201d 
Kafka
  topic name and\n\u201clocalhost:9092\u201d is the Broker:\n\n\nproperty\n\n  
\nname\ndt.operator.MessageReader.prop.topics\n/name\n\n  
\nvalue\ntest\n/value\n\n\n/property\n\n\n\nproperty\n\n  
\nname\ndt.operator.KafkaInputOperator.prop.clusters\n/nam\n\n  
\nvalue\nlocalhost:9092\n/value\n\n\n/property\n\n\n\n\n\nBy adding following 
lines to properties file, Kafka Input Operator supports multi-topic and 
multi-cluster:\n\n\nproperty\n\n  
\nname\ndt.operator.MessageReader.prop.topics\n/name\n\n  \nvalue\ntest1, 
test2\n/value\n\n\n/property\n\n\n\nproperty\n\n  
\nname\ndt.operator.KafkaInputOperator.prop.clusters\n/nam\n\n  
\nvalue\nlocalhost:9092; localhost:9093; 
localhost:9094\n/value\n\n\n/property\n\n\n\n\n\nFor a full example application 
project, refer to 
https://github.com/DataTorrent/examples/tree/master/tutorials/kafka";, 
+            "title": "Kafka Input"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafka-input-operator", 
+            "text": "", 
+            "title": "KAFKA INPUT OPERATOR"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#introduction", 
+            "text": "Apache Kafka  is a pull-based and distributed publish 
subscribe messaging system,\ntopics are partitioned and replicated across 
nodes.   The Kafka input operator consumes data from the partitions of a Kafka 
topic for processing in Apex. \nThe operator has the ability to automatically 
scale with the Kafka partitioning for high throughput. \nIt is fault-tolerant 
(consumer offset checkpointing) and guarantees idempotency to allow 
exactly-once results in the downstream pipeline.  For more information about 
the operator design see this  presentation \nand for processing guarantees this 
 blog .  There are two separate implementations of the input operator,\none 
built against Kafka 0.8 client and a newer version for the\nKafka 0.9 consumer 
API that also works with MapR Streams.\nThese reside in different packages and 
are described separately below.", 
+            "title": "Introduction"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#kafka-input-operator-for-kafka-08x", 
+            "text": "Package:  com.datatorrent.contrib.kafka  Maven artifact:  
malhar-contrib", 
+            "title": "Kafka Input Operator for Kafka 0.8.x"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#abstractkafkainputoperator", 
+            "text": "This is the abstract implementation that serves as base 
class for consuming messages from Kafka messaging system. This class 
doesn\u2019t have any ports.", 
+            "title": "AbstractKafkaInputOperator"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-parameters", 
+            "text": "Parameter  Description    maxTuplesPerWindow  Controls 
the maximum number of messages emitted in each streaming window from this 
operator. Minimum value is 1. Default value = MAX_VALUE     
idempotentStorageManager  This is an instance of IdempotentStorageManager. 
Idempotency ensures that the operator will process the same set of messages in 
a window before and after a failure. For example, let's say the operator 
completed window 10 and failed somewhere between window 11. If the operator 
gets restored at window 10 then it will process the same messages again in 
window 10 which it did in the previous run before the failure. Idempotency is 
important but comes with higher cost because at the end of each window the 
operator needs to persist some state with respect to that window. Default Value 
= com.datatorrent.lib.io.IdempotentStorageManager. NoopIdempotentStorageManager 
   strategy  Operator supports two types of partitioning strategies, ONE_TO_ONE 
and ONE_TO_MANY.
   ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator 
instance per Kafka topic partition. So the number of Kafka topic partitions 
equals the number of operator instances.  ONE_TO_MANY: The AppMaster creates K 
= min(initialPartitionCount, N) Kafka input operator instances where N is the 
number of Kafka topic partitions. If K is less than N, the remaining topic 
partitions are assigned to the K operator instances in round-robin fashion. If 
K is less than initialPartitionCount, the AppMaster creates one input operator 
instance per Kafka topic partition. For example, if initialPartitionCount = 5 
and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input 
operator instances.\nDefault Value = ONE_TO_ONE    msgRateUpperBound  Maximum 
messages upper bound. Operator repartitions when the  msgProcessedPS  exceeds 
this bound.  msgProcessedPS  is the average number of messages processed per 
second by this operator.    byteRateUpperBound  Maximum bytes upper bo
 und. Operator repartitions when the  bytesPS  exceeds this bound.  bytesPS  is 
the average number of bytes processed per second by this operator.     
offsetManager  This is an optional parameter that is useful when the 
application restarts or start at specific offsets (offsets are explained below) 
   repartitionInterval  Interval specified in milliseconds. This value 
specifies the minimum time required between two repartition actions. Default 
Value = 30 Seconds    repartitionCheckInterval  Interval specified in 
milliseconds. This value specifies the minimum interval between two offset 
updates. Default Value = 5 Seconds    initialPartitionCount  When the 
ONE_TO_MANY partition strategy is enabled, this value indicates the number of 
Kafka input operator instances. Default Value = 1    consumer  This is an 
instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = 
Instance of SimpleKafkaConsumer.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods", 
+            "text": "void emitTuple(Message message) : Abstract method that 
emits tuples extracted from Kafka message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafkaconsumer", 
+            "text": "This is an abstract implementation of Kafka consumer. It 
sends the fetch\nrequests to the leading brokers of Kafka partitions. For each 
request,\nit receives the set of messages and stores them into the buffer which 
is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which extends\nKafkaConsumer 
and serves the functionality of Simple Consumer API and\nHighLevelKafkaConsumer 
which extends KafkaConsumer and \u00a0serves the\nfunctionality of High Level 
Consumer API.", 
+            "title": "KafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#pre-requisites", 
+            "text": "This operator uses the Kafka 0.8.2.1 client consumer 
API\nand will work with 0.8.x and 0.7.x versions of Kafka broker.", 
+            "title": "Pre-requisites"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-parameters_1", 
+            "text": "Parameter  Type  Default  Description    zookeeper  
String   Specifies the zookeeper quorum of Kafka clusters that you want to 
consume messages from. zookeeper \u00a0is a string in the form of 
hostname1:port1,hostname2:port2,hostname3:port3 \u00a0where 
hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of 
zookeeper server. \u00a0If the topic name is the same across the Kafka clusters 
and want to consume data from these clusters, then configure the zookeeper as 
follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6  where  c1,c2,c3 
indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and 
p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in 
case of single cluster    cacheSize  int  1024  Maximum of buffered messages 
hold in memory.    topic  String  default_topic  Indicates the name of the 
topic.    initialOffset  String  latest  Indicates the type of offset i.e, 
\u201cearliest or latest\u201
 d. If initialOffset is \u201clatest\u201d, then the operator consumes messages 
from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, 
then the operator consumes messages starting from message queue. This can be 
overridden by OffsetManager.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_1", 
+            "text": "void commitOffset(): Commit the offsets at checkpoint.  
Map  KafkaPartition, Long  getCurrentOffsets(): Return the current\n    offset 
status.  resetPartitionsAndOffset(Set  KafkaPartition  partitionIds,\n    Map  
KafkaPartition, Long  startOffset): Reset the partitions with\n    parittionIds 
and offsets with startOffset.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-parameters-for-simplekafkaconsumer",
 
+            "text": "Parameter  Type  Default  Description    bufferSize  int  
1 MB  Specifies the maximum total size of messages for each fetch request.    
metadataRefreshInterval  int  30 Seconds  Interval in between refresh the 
metadata change(broker change) in milliseconds. Enabling metadata refresh 
guarantees an automatic reconnect when a new broker is elected as the host. A 
value of -1 disables this feature.    metadataRefreshRetryLimit  int  -1  
Specifies the maximum brokers' metadata refresh retry limit. -1 means unlimited 
retry.", 
+            "title": "Configuration Parameters\u00a0for SimpleKafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#offsetmanager", 
+            "text": "This is an interface for offset management and is useful 
when consuming data\nfrom specified offsets. Updates the offsets for all the 
Kafka partitions\nperiodically. Below is the code 
snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0  public interface 
OffsetManager\n{\n  public Map KafkaPartition, Long  loadInitialOffsets();\n  
public void updateOffsets(Map KafkaPartition, Long  offsetsOfPartitions);\n}", 
+            "title": "OffsetManager"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_2", 
+            "text": "Map  KafkaPartition, Long  loadInitialOffsets() : 
Specifies the initial offset for consuming messages; called at the activation 
stage.  updateOffsets(Map KafkaPartition, Long  offsetsOfPartitions) : 
\u00a0This\nmethod is called at every repartitionCheckInterval to update 
offsets.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#partitioning", 
+            "text": "The logical instance of the KafkaInputOperator acts as 
the Partitioner\nas well as a StatsListener. This is because 
the\nAbstractKafkaInputOperator implements both 
the\ncom.datatorrent.api.Partitioner and 
com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of 
definePartitions(...) and\nprocessStats(...) which makes it auto-scalable.", 
+            "title": "Partitioning"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#response-processstatsbatchedoperatorstats-stats",
 
+            "text": "The application master invokes this method on the logical 
instance with\nthe stats (tuplesProcessedPS, bytesPS, etc.) of each 
partition.\nRe-partitioning happens based on whether any new Kafka partitions 
added for\nthe topic or bytesPS and msgPS cross their respective upper 
bounds.", 
+            "title": "Response processStats(BatchedOperatorStats stats)"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#definepartitions", 
+            "text": "Based on the repartitionRequired field of the Response 
object which is\nreturned by processStats(...) method, the application master 
invokes\ndefinePartitions(...) on the logical instance which is also 
the\npartitioner instance. Dynamic partition can be disabled by setting 
the\nparameter repartitionInterval value to a negative value.", 
+            "title": "DefinePartitions"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#abstractsingleportkafkainputoperator", 
+            "text": "This class extends AbstractKafkaInputOperator to emit 
messages through single output port.", 
+            "title": "AbstractSinglePortKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports", 
+            "text": "outputPort  T : Tuples extracted from Kafka messages are 
emitted through this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_3", 
+            "text": "T getTuple(Message msg) : Converts the Kafka message to 
tuple.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#concrete-classes", 
+            "text": "KafkaSinglePortStringInputOperator: extends  
AbstractSinglePortKafkaInputOperator , extracts string from Kafka message.  
KafkaSinglePortByteArrayInputOperator: extends  
AbstractSinglePortKafkaInputOperator , extracts byte array from Kafka 
message.", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#application-example", 
+            "text": "This section builds an Apex application using Kafka input 
operator.\nBelow is the code snippet:  @ApplicationAnnotation(name =  KafkaApp 
)\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n  
@Override\n  public void populateDAG(DAG dag, Configuration entries)\n  {\n    
KafkaSinglePortByteArrayInputOperator input =  dag.addOperator( MessageReader , 
new KafkaSinglePortByteArrayInputOperator());\n    ConsoleOutputOperator output 
= dag.addOperator( Output , new ConsoleOutputOperator());\n    dag.addStream( 
MessageData , input.outputPort, output.input);\n  }\n}  Below is the 
configuration for \u201ctest\u201d Kafka topic name 
and\n\u201clocalhost:2181\u201d is the zookeeper forum:  property \n   name 
dt.operator.MessageReader.prop.topic /name \n   value test /value  /property  
property \n   name dt.operator.KafkaInputOperator.prop.zookeeper /nam \n   
value localhost:2181 /value  /property", 
+            "title": "Application Example"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#kafka-input-operator-for-kafka-09x", 
+            "text": "Package:  org.apache.apex.malhar.kafka  Maven Artifact:  
malhar-kafka  This version uses the new 0.9 version of consumer API and works 
with Kafka broker version 0.9 and later.\nThe operator is fault-tolerant, 
scalable and supports input from multiple clusters and multiple topics in a 
single operator instance.", 
+            "title": "Kafka Input Operator for Kafka 0.9.x"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#pre-requisites_1", 
+            "text": "This operator requires version 0.9.0 or later of the 
Kafka Consumer API.", 
+            "title": "Pre-requisites"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#abstractkafkainputoperator_1", 
+            "text": "", 
+            "title": "AbstractKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports_1", 
+            "text": "This abstract class doesn't have any ports.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-properties", 
+            "text": "clusters  - String[]   Mandatory Parameter.  Specifies 
the Kafka clusters that you want to consume messages from. To configure 
multi-cluster support, you need to specify the clusters separated by \";\".     
topics  - String[]   Mandatory Parameter.  Specified the Kafka topics that you 
want to consume messages from. If you want multi-topic support, then specify 
the topics separated by \",\".     strategy  - PartitionStrategy    Operator 
supports two types of partitioning strategies,  ONE_TO_ONE  and  ONE_TO_MANY .  
ONE_TO_ONE : If this is enabled, the AppMaster creates one input operator 
instance per Kafka topic partition. So the number of Kafka topic partitions 
equals the number of operator instances. ONE_TO_MANY : The AppMaster creates K 
= min(initialPartitionCount, N) Kafka input operator instances where N is the 
number of Kafka topic partitions. If K is less than N, the remaining topic 
partitions are assigned to the K operator instances in round-robin fashion
 . If K is less than initialPartitionCount, the AppMaster creates one input 
operator instance per Kafka topic partition. For example, if 
initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster 
creates 2 Kafka input operator instances.\nDefault Value =  
PartitionStrategy.ONE_TO_ONE .      initialPartitionCount  - Integer   When the 
ONE_TO_MANY partition strategy is enabled, this value indicates the number of 
Kafka input operator instances. \n    Default Value = 1.     
repartitionInterval  - Long   Interval specified in milliseconds. This value 
specifies the minimum time required between two repartition actions. \n    
Default Value = 30 Seconds.     repartitionCheckInterval  - Long   Interval 
specified in milliseconds. This value specifies the minimum interval between 
two stat checks.\n    Default Value = 5 Seconds.     maxTuplesPerWindow  - 
Integer   Controls the maximum number of messages emitted in each streaming 
window from this operator. Minimum value is 1. 
 \n    Default value =  MAX_VALUE       initialOffset  - InitialOffset   
Indicates the type of offset i.e,  EARLIEST  or  LATEST  or  
APPLICATION_OR_EARLIEST  or  APPLICATION_OR_LATEST . \n     LATEST  =  Consume 
new messages from latest offset in the topic. \n     EARLIEST  =  Consume all 
messages available in the topic.\n     APPLICATION_OR_EARLIEST  =  Consume 
messages from committed position from last run. If there is no committed 
offset, then start consuming from beginning.\n     APPLICATION_OR_LATEST  =  
Consumes messages from committed position from last run. If a committed offset 
is unavailable, then start consuming from latest position.\n    Default value = 
 InitialOffset.APPLICATION_OR_LATEST     metricsRefreshInterval  - Long   
Interval specified in milliseconds. This value specifies the minimum interval 
between two metric stat updates.\n    Default value = 5 Seconds.     
consumerTimeout  - Long   Indicates the  time waiting in poll  when data is not 
available.\n    Defaul
 t value = 5 Seconds.     holdingBufferSize  - Long   Indicates the maximum 
number of messages kept in memory for emitting.\n    Default value = 1024.     
consumerProps  - Properties   Specify the [consumer 
properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) 
which are not yet set to the operator.     windowDataManager  - 
WindowDataManager   If set to a value other than the default, such as  
FSWindowDataManager , specifies that the operator will process the same set of 
messages in a window before and after a failure. This is important but it comes 
with higher cost because at the end of each window the operator needs to 
persist some state with respect to that window.\n    Default value =  
WindowDataManager.NoopWindowDataManager .", 
+            "title": "Configuration properties"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_4", 
+            "text": "void emitTuple(String cluster, ConsumerRecord byte[], 
byte[]  message) : Abstract method that emits tuples\nextracted from Kafka 
message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#concrete-classes_1", 
+            "text": "", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#kafkasingleportinputoperator", 
+            "text": "This class extends from AbstractKafkaInputOperator and 
defines the  getTuple()  method which extracts byte array from Kafka message.", 
+            "title": "KafkaSinglePortInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports_2", 
+            "text": "outputPort  byte[] : Tuples extracted from Kafka messages 
are emitted through this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#application-example_1", 
+            "text": "This section builds an Apex application using Kafka input 
operator.\nBelow is the code snippet:  @ApplicationAnnotation(name =  KafkaApp 
)\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n  
@Override\n  public void populateDAG(DAG dag, Configuration entries)\n  {\n    
KafkaSinglePortInputOperator input =  dag.addOperator( MessageReader , new 
KafkaSinglePortInputOperator());\n    ConsoleOutputOperator output = 
dag.addOperator( Output , new ConsoleOutputOperator());\n    dag.addStream( 
MessageData , input.outputPort, output.input);\n  }\n}  Below is the 
configuration for \u201ctest\u201d Kafka topic name 
and\n\u201clocalhost:9092\u201d is the Broker:  property \n   name 
dt.operator.MessageReader.prop.topics /name \n   value test /value  /property  
property \n   name dt.operator.KafkaInputOperator.prop.clusters /nam \n   value 
localhost:9092 /value  /property   By adding following lines to properties 
file, Kafka Input Operator supports mult
 i-topic and multi-cluster:  property \n   name 
dt.operator.MessageReader.prop.topics /name \n   value test1, test2 /value  
/property  property \n   name dt.operator.KafkaInputOperator.prop.clusters /nam 
\n   value localhost:9092; localhost:9093; localhost:9094 /value  /property   
For a full example application project, refer to 
https://github.com/DataTorrent/examples/tree/master/tutorials/kafka";, 
+            "title": "Application Example"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/", 
+            "text": "JMS INPUT OPERATOR\n\n\nIntroduction: About the JMS Input 
Operator\n\n\nThe JMS input operator consumes data from a messaging system 
using the JMS client API. JMS not being a communication protocol, the operator 
needs an underlying JMS client API library to talk to a messaging system. 
Currently the operator has been tested with the Amazon SQS and Apache ActiveMQ 
System brokers via their respective JMS client API libraries.\n\n\nWhy is it 
needed ?\n\n\nYou will need the operator to read data from a messaging system 
(e.g. Apache ActiveMQ) via the JMS client API. The operator supports both the 
publish-subscribe (topics) and point-to-point (queues) modes. The operator 
currently does not support partitioning and dynamic 
scalability.\n\n\nJMSBase\n\n\nThis class encapsulates various JMS properties 
and behaviors and maintains connections with the JMS broker. This is the base 
class for JMS input and output adaptor operators. Operators should not directly 
subclass JMSBas
 e but one of the JMS input or output 
operators.\n\n\nAbstractJMSInputOperator\n\n\nThis abstract implementation 
serves as the base class for consuming generic messages from an external 
messaging system. Concrete subclasses implement conversion and emit methods to 
emit tuples for a concrete type. JMSStringInputOperator is one such subclass in 
the library used for String messages. JMSObjectInputOperator is another one 
used for multiple message types where the user has the ability to get String, 
byte array, Map or POJO messages on the respective output 
ports.\n\n\nConfiguration Parameters\n\n\nCommon configuration parameters are 
described 
here.\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nDescription\n\n\n\n\n\n\nwindowDataManager\n\n\nThis
 is an instance of \nWindowDataManager\n that implements idempotency. 
Idempotency ensures that an operator will process the same set of messages in a 
window before and after a failure. For example, say the operator completed 
window 10 and failed before or d
 uring window 11. If the operator gets restored at window 10, it will replay 
the messages of window 10 which were saved from the previous run before the 
failure. Although important, idempotency comes at a price because an operator 
needs to persist some state at the end of each window. Default Value = 
\norg.apache.apex.malhar.lib.wal.FSWindowDataManager\n\n\n\n\n\n\nconnectionFactoryBuilder\n\n\nThe
 operator uses the builder pattern that requires the user to specify an 
instance of \ncom.datatorrent.lib.io.jms.JMSBase.ConnectionFactoryBuilder\n. 
This builder creates the connection factory that encapsulates the underlying 
JMS client API library (e.g. ActiveMQ or Amazon SQS). By default the operator 
uses \ncom.datatorrent.lib.io.jms.JMSBase.DefaultConnectionFactoryBuilder\n 
which is used for ActiveMQ. One of the examples below describes the Amazon SQS 
use-case. \n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nThe following abstract 
methods need to be implemented by concrete subclasses.\n\n\nT con
 vert(Message message): This method converts a JMS Message object to type 
T.\n\n\nvoid emit(T payload): This method emits a tuple given the payload 
extracted from a JMS message.\n\n\nConcrete 
Classes\n\n\n\n\n\n\nJMSStringInputOperator :\nThis class extends 
AbstractJMSInputOperator to deliver String payloads in the 
tuple.\n\n\n\n\n\n\nJMSObjectInputOperator:\nThis class extends 
AbstractJMSInputOperator to deliver String, byte array, Map or POJO payloads in 
the tuple.\n\n\n\n\n\n\nApplication Examples\n\n\nActiveMQ Example\n\n\nThe 
source code for the tutorial can be found 
here:\n\n\nhttps://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ\n\n\nThe
 following code snippet from the example illustrates how the DAG is 
created:\n\n\n  @Override\n  public void populateDAG(DAG dag, Configuration 
conf)\n  {\n    JMSStringInputOperator amqInput = dag.addOperator(\namqIn\n, \n 
       new JMSStringInputOperator());\n\n    LineOutputOperator out = 
dag.addOperator(\nfileOut\n, new
  LineOutputOperator());\n\n    dag.addStream(\ndata\n, amqInput.output, 
out.input);\n  }\n\n\n\n\nThe DAG consists of only 2 operators: the 
\nJMSStringInputOperator\n which is the input operator that feeds received 
ActiveMQ messages into the output operator \nLineOutputOperator\n which outputs 
these messages into a file or files.\n\n\nThe default connectionFactoryBuilder 
supports ActiveMQ so there is no need to set this value. However the following 
ActiveMQ related values need to be set either from properties files or using 
the appropriate setter methods in the 
code:\n\n\n\n\n\n\n\n\n\n\n\n\nValue\n\n\nDescription\n\n\n\n\n\n\nconnectionFactoryProperties\n\n\nThis
 is a Map of key and value strings and can be set directly from configuration 
as in the example above. The table below describes the most important 
properties.\n\n\n\n\n\n\ntopic\n\n\nThis boolean value is set to true for the 
publish-subscribe case and false for the PTP (point-to-point) 
case.\n\n\n\n\n\n\nsubject\n\n\nThis 
 is the queue name for PTP (point-to-point) use-case and topic name for the 
publish-subscribe use case.\n\n\n\n\n\n\ndurable\n\n\nThis boolean value is set 
to true for durable subscriptionss, false otherwise. Durable subscriptions save 
messages to persistent storage until consumed. Used only when the clientId (see 
below) is set.\n\n\n\n\n\n\nclientId\n\n\nThe client-ID for this ActiveMQ 
consumer in the durable subscription mode as described 
above.\n\n\n\n\n\n\ntransacted\n\n\nThis boolean value is set to true for 
transacted JMS sessions as described in 
\n\nSession\n.\n\n\n\n\n\n\nackMode\n\n\nThis string value sets the 
acknowledgement mode as described in \n\nSession 
fields\n.\n\n\n\n\n\n\n\n\n\nThe following table describes the string 
properties to be set in the map that is passed in the 
connectionFactoryProperties value described 
above.\n\n\n\n\n\n\n\n\n\n\n\n\nProperty 
Name\n\n\nDescription\n\n\n\n\n\n\nbrokerURL\n\n\nThe \nconnection URL\n \nused 
to connect to the ActiveMQ broker
 \n\n\n\n\nuserName\n\n\nThe JMS userName used by connections created by this 
factory (optional when anonymous access is used)\n\n\n\n\n\n\npassword\n\n\nThe 
JMS password used for connections created from this factory (optional when 
anonymous access is used)\n\n\n\n\n\n\n\n\n\nThese properties can be set from 
the properties.xml file as shown below \n(from the example 
\nhttps://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ\n 
).\n\n\nconfiguration\n\n  \nproperty\n\n    
\nname\ndt.operator.amqIn.prop.connectionFactoryProperties.brokerURL\n/name\n\n 
   \nvalue\nvm://localhost\n/value\n\n  \n/property\n\n  \nproperty\n\n    
\nname\ndt.operator.amqIn.prop.subject\n/name\n\n    
\nvalue\njms4Amq\n/value\n\n  \n/property\n\n\n/configuration\n                 
                                                                                
       \n\n\n\n\nSQS Example\n\n\nThe source code for the tutorial can be found 
here:\n\n\nhttps://github.com/DataTorrent/examples/tree/m
 aster/tutorials/jmsSqs\n\n\nThe following code snippet from the example 
illustrates how the DAG is created:\n\n\n @Override\n public void 
populateDAG(DAG dag, Configuration conf)\n {\n\n   JMSStringInputOperator 
sqsInput = dag.addOperator(\nsqsIn\n, \n       new 
JMSStringInputOperator());\n\n   MyConnectionFactoryBuilder factoryBuilder = 
new MyConnectionFactoryBuilder();\n\n   factoryBuilder.sqsDevCredsFilename = 
conf.get(SQSDEV_CREDS_FILENAME_PROPERTY);\n\n   
sqsInput.setConnectionFactoryBuilder(factoryBuilder);\n\n   LineOutputOperator 
out = dag.addOperator(\nfileOut\n, new LineOutputOperator());\n\n   
dag.addStream(\ndata\n, sqsInput.output, out.input);\n }\n\n\n\n\nThe DAG 
consists of only 2 operators: the \nJMSStringInputOperator\n which is the input 
operator that feeds received SQS messages into the output operator 
\nLineOutputOperator\n which outputs these messages into a file or files. The 
code also shows how the AWS/SQS credentials are initialized in the factory 
builder. \n
 \n\nFor SQS you will have to provide a custom connectionFactoryBuilder as 
shown in the example above and in \nSQSConnectionFactory.java\n. The builder is 
typically used to supply AWS region and credential information that cannot be 
supplied via any JMS interfaces.\n\n\nThe following code snippet shows a 
typical Builder implementation that can be supplied to the operator. The AWS 
credentials are supplied via a \nPropertiesFileCredentialsProvider\n object in 
which sqsCredsFilename is the fully qualified path to a properties file from 
which the AWS security credentials are to be loaded. For example 
\n/etc/somewhere/credentials.properties\n\n\nstatic class 
MyConnectionFactoryBuilder implements JMSBase.ConnectionFactoryBuilder 
{\n\nString 
sqsCredsFilename;\n\nMyConnectionFactoryBuilder()\n{\n}\n\n@Override\npublic 
ConnectionFactory buildConnectionFactory() \n{\n  // Create the connection 
factory using the properties file credential provider.\n  // Connections this 
factory creates can tal
 k to the queues in us-east-1 region. \n  SQSConnectionFactory 
connectionFactory =\n    SQSConnectionFactory.builder()\n      
.withRegion(Region.getRegion(Regions.US_EAST_1))\n      
.withAWSCredentialsProvider(new 
PropertiesFileCredentialsProvider(sqsCredsFilename))\n      .build();\n    
return connectionFactory;\n  }\n}", 
+            "title": "JMS Input"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#jms-input-operator", 
+            "text": "", 
+            "title": "JMS INPUT OPERATOR"
+        }, 
+        {
+            "location": 
"/operators/jmsInputOperator/#introduction-about-the-jms-input-operator", 
+            "text": "The JMS input operator consumes data from a messaging 
system using the JMS client API. JMS not being a communication protocol, the 
operator needs an underlying JMS client API library to talk to a messaging 
system. Currently the operator has been tested with the Amazon SQS and Apache 
ActiveMQ System brokers via their respective JMS client API libraries.", 
+            "title": "Introduction: About the JMS Input Operator"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#why-is-it-needed", 
+            "text": "You will need the operator to read data from a messaging 
system (e.g. Apache ActiveMQ) via the JMS client API. The operator supports 
both the publish-subscribe (topics) and point-to-point (queues) modes. The 
operator currently does not support partitioning and dynamic scalability.", 
+            "title": "Why is it needed ?"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#jmsbase", 
+            "text": "This class encapsulates various JMS properties and 
behaviors and maintains connections with the JMS broker. This is the base class 
for JMS input and output adaptor operators. Operators should not directly 
subclass JMSBase but one of the JMS input or output operators.", 
+            "title": "JMSBase"
+        }, 
+        {
+            "location": 
"/operators/jmsInputOperator/#abstractjmsinputoperator", 
+            "text": "This abstract implementation serves as the base class for 
consuming generic messages from an external messaging system. Concrete 
subclasses implement conversion and emit methods to emit tuples for a concrete 
type. JMSStringInputOperator is one such subclass in the library used for 
String messages. JMSObjectInputOperator is another one used for multiple 
message types where the user has the ability to get String, byte array, Map or 
POJO messages on the respective output ports.", 
+            "title": "AbstractJMSInputOperator"
+        }, 
+        {
+            "location": 
"/operators/jmsInputOperator/#configuration-parameters", 
+            "text": "Common configuration parameters are described here.      
Parameter  Description    windowDataManager  This is an instance of  
WindowDataManager  that implements idempotency. Idempotency ensures that an 
operator will process the same set of messages in a window before and after a 
failure. For example, say the operator completed window 10 and failed before or 
during window 11. If the operator gets restored at window 10, it will replay 
the messages of window 10 which were saved from the previous run before the 
failure. Although important, idempotency comes at a price because an operator 
needs to persist some state at the end of each window. Default Value =  
org.apache.apex.malhar.lib.wal.FSWindowDataManager    connectionFactoryBuilder  
The operator uses the builder pattern that requires the user to specify an 
instance of  com.datatorrent.lib.io.jms.JMSBase.ConnectionFactoryBuilder . This 
builder creates the connection factory that encapsulates the underlying JMS cl
 ient API library (e.g. ActiveMQ or Amazon SQS). By default the operator uses  
com.datatorrent.lib.io.jms.JMSBase.DefaultConnectionFactoryBuilder  which is 
used for ActiveMQ. One of the examples below describes the Amazon SQS 
use-case.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#abstract-methods", 
+            "text": "The following abstract methods need to be implemented by 
concrete subclasses.  T convert(Message message): This method converts a JMS 
Message object to type T.  void emit(T payload): This method emits a tuple 
given the payload extracted from a JMS message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#concrete-classes", 
+            "text": "JMSStringInputOperator :\nThis class extends 
AbstractJMSInputOperator to deliver String payloads in the tuple.    
JMSObjectInputOperator:\nThis class extends AbstractJMSInputOperator to deliver 
String, byte array, Map or POJO payloads in the tuple.", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#application-examples", 
+            "text": "", 
+            "title": "Application Examples"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#activemq-example", 
+            "text": "The source code for the tutorial can be found here:  
https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ  The 
following code snippet from the example illustrates how the DAG is created:    
@Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    
JMSStringInputOperator amqInput = dag.addOperator( amqIn , \n        new 
JMSStringInputOperator());\n\n    LineOutputOperator out = dag.addOperator( 
fileOut , new LineOutputOperator());\n\n    dag.addStream( data , 
amqInput.output, out.input);\n  }  The DAG consists of only 2 operators: the  
JMSStringInputOperator  which is the input operator that feeds received 
ActiveMQ messages into the output operator  LineOutputOperator  which outputs 
these messages into a file or files.  The default connectionFactoryBuilder 
supports ActiveMQ so there is no need to set this value. However the following 
ActiveMQ related values need to be set either from properties files or using 
the appropriate
  setter methods in the code:       Value  Description    
connectionFactoryProperties  This is a Map of key and value strings and can be 
set directly from configuration as in the example above. The table below 
describes the most important properties.    topic  This boolean value is set to 
true for the publish-subscribe case and false for the PTP (point-to-point) 
case.    subject  This is the queue name for PTP (point-to-point) use-case and 
topic name for the publish-subscribe use case.    durable  This boolean value 
is set to true for durable subscriptionss, false otherwise. Durable 
subscriptions save messages to persistent storage until consumed. Used only 
when the clientId (see below) is set.    clientId  The client-ID for this 
ActiveMQ consumer in the durable subscription mode as described above.    
transacted  This boolean value is set to true for transacted JMS sessions as 
described in  Session .    ackMode  This string value sets the acknowledgement 
mode as described in  Sessio
 n fields .     The following table describes the string properties to be set 
in the map that is passed in the connectionFactoryProperties value described 
above.       Property Name  Description    brokerURL  The  connection URL  
\nused to connect to the ActiveMQ broker   userName  The JMS userName used by 
connections created by this factory (optional when anonymous access is used)    
password  The JMS password used for connections created from this factory 
(optional when anonymous access is used)     These properties can be set from 
the properties.xml file as shown below \n(from the example  
https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ  ).  
configuration \n   property \n     name 
dt.operator.amqIn.prop.connectionFactoryProperties.brokerURL /name \n     value 
vm://localhost /value \n   /property \n   property \n     name 
dt.operator.amqIn.prop.subject /name \n     value jms4Amq /value \n   /property 
 /configuration", 
+            "title": "ActiveMQ Example"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#sqs-example", 
+            "text": "The source code for the tutorial can be found here:  
https://github.com/DataTorrent/examples/tree/master/tutorials/jmsSqs  The 
following code snippet from the example illustrates how the DAG is created:   
@Override\n public void populateDAG(DAG dag, Configuration conf)\n {\n\n   
JMSStringInputOperator sqsInput = dag.addOperator( sqsIn , \n       new 
JMSStringInputOperator());\n\n   MyConnectionFactoryBuilder factoryBuilder = 
new MyConnectionFactoryBuilder();\n\n   factoryBuilder.sqsDevCredsFilename = 
conf.get(SQSDEV_CREDS_FILENAME_PROPERTY);\n\n   
sqsInput.setConnectionFactoryBuilder(factoryBuilder);\n\n   LineOutputOperator 
out = dag.addOperator( fileOut , new LineOutputOperator());\n\n   
dag.addStream( data , sqsInput.output, out.input);\n }  The DAG consists of 
only 2 operators: the  JMSStringInputOperator  which is the input operator that 
feeds received SQS messages into the output operator  LineOutputOperator  which 
outputs these messages into a file or fil
 es. The code also shows how the AWS/SQS credentials are initialized in the 
factory builder.   For SQS you will have to provide a custom 
connectionFactoryBuilder as shown in the example above and in  
SQSConnectionFactory.java . The builder is typically used to supply AWS region 
and credential information that cannot be supplied via any JMS interfaces.  The 
following code snippet shows a typical Builder implementation that can be 
supplied to the operator. The AWS credentials are supplied via a  
PropertiesFileCredentialsProvider  object in which sqsCredsFilename is the 
fully qualified path to a properties file from which the AWS security 
credentials are to be loaded. For example  
/etc/somewhere/credentials.properties  static class MyConnectionFactoryBuilder 
implements JMSBase.ConnectionFactoryBuilder {\n\nString 
sqsCredsFilename;\n\nMyConnectionFactoryBuilder()\n{\n}\n\n@Override\npublic 
ConnectionFactory buildConnectionFactory() \n{\n  // Create the connection 
factory using the proper
 ties file credential provider.\n  // Connections this factory creates can talk 
to the queues in us-east-1 region. \n  SQSConnectionFactory connectionFactory 
=\n    SQSConnectionFactory.builder()\n      
.withRegion(Region.getRegion(Regions.US_EAST_1))\n      
.withAWSCredentialsProvider(new 
PropertiesFileCredentialsProvider(sqsCredsFilename))\n      .build();\n    
return connectionFactory;\n  }\n}", 
+            "title": "SQS Example"
+        }, 
+        {
+            "location": "/operators/file_splitter/", 
+            "text": "File Splitter\n\n\nThis is a simple operator whose main 
function is to split a file virtually and create metadata describing the files 
and the splits. \n\n\nWhy is it needed?\n\n\nIt is a common operation to read a 
file and parse it. This operation can be parallelized by having multiple 
partitions of such operators and each partition operating on different files. 
However, at times when a file is large then a single partition reading it can 
become a bottleneck.\nIn these cases, throughput can be increased if instances 
of the partitioned operator can read and parse non-overlapping sets of file 
blocks. This is where file splitter comes in handy. It creates metadata of 
blocks of file which serves as tasks handed out to downstream operator 
partitions. \nThe downstream partitions can read/parse the block without the 
need of interacting with other partitions.\n\n\nClass 
Diagram\n\n\n\n\nAbstractFileSplitter\n\n\nThe abstract implementation defines 
the logic of processi
 ng \nFileInfo\n. This comprises the following tasks -  \n\n\n\n\n\n\nbuilding 
\nFileMetadata\n per file and emitting it. This metadata contains the file 
information such as filepath, no. of blocks in it, length of the file, all the 
block ids, etc.\n\n\n\n\n\n\ncreating \nBlockMetadataIterator\n from 
\nFileMetadata\n. The iterator lazy-loads the block metadata when needed. We 
use an iterator because the no. of blocks in a file can be huge if the block 
size is small and loading all of them at once in memory may cause out of memory 
errors.\n\n\n\n\n\n\nretrieving \nBlockMetadata.FileBlockMetadata\n from the 
block metadata iterator and emitting it. The FileBlockMetadata contains the 
block id, start offset of the block, length of file in the block, etc. The 
number of block metadata emitted per window are controlled by 
\nblocksThreshold\n setting which by default is 1.  \n\n\n\n\n\n\nThe main 
utility method that performs all the above tasks is the \nprocess()\n method. 
Concrete implementa
 tions can invoke this method whenever they have data to 
process.\n\n\nPorts\n\n\nDeclares only output ports on which file metadata and 
block metadata are emitted.\n\n\n\n\nfilesMetadataOutput: metadata for each 
file is emitted on this port. \n\n\nblocksMetadataOutput: metadata for each 
block is emitted on this port. \n\n\n\n\nprocess()\n method\n\n\nWhen process() 
is invoked, any pending blocks from the current file are emitted on the 
'blocksMetadataOutput' port. If the threshold for blocks per window is still 
not met then a new input file is processed - corresponding metadata is emitted 
on 'filesMetadataOutput' and more of its blocks are emitted. This operation is 
repeated until the \nblocksThreshold\n is reached or there are no more new 
files.\n\n\n  protected void process()\n  {\n    if (blockMetadataIterator != 
null \n blockCount \n blocksThreshold) {\n      emitBlockMetadata();\n    }\n\n 
   FileInfo fileInfo;\n    while (blockCount \n blocksThreshold \n (fileInfo = 
getFileInfo
 ()) != null) {\n      if (!processFileInfo(fileInfo)) {\n        break;\n      
}\n    }\n  }\n\n\n\n\nAbstract methods\n\n\n\n\n\n\nFileInfo getFileInfo()\n: 
called from within the \nprocess()\n and provides the next file to 
process.\n\n\n\n\n\n\nlong getDefaultBlockSize()\n: provides the block size 
which is used when user hasn't configured the size.\n\n\n\n\n\n\nFileStatus 
getFileStatus(Path path)\n: provides the \norg.apache.hadoop.fs.FileStatus\n 
instance for a path.   \n\n\n\n\n\n\nConfiguration\n\n\n\n\nblockSize\n: size 
of a block.\n\n\nblocksThreshold\n: threshold on the number of blocks emitted 
by file splitter every window. This setting is used for throttling the work for 
downstream operators.\n\n\n\n\nFileSplitterBase\n\n\nSimple operator that 
receives tuples of type \nFileInfo\n on its \ninput\n port. \nFileInfo\n 
contains the information (currently just the file path) about the file which 
this operator uses to create file metadata and block metadata.\n\n\nExample 
applica
 tion\n\n\nThis is a simple sub-dag that demonstrates how FileSplitterBase can 
be plugged into an application.\n\n\n\nThe upstream operator emits tuples of 
type \nFileInfo\n on its output port which is connected to splitter input port. 
The downstream receives tuples of type \nBlockMetadata.FileBlockMetadata\n from 
the splitter's block metadata output port.\n\n\npublic class 
ApplicationWithBaseSplitter implements StreamingApplication\n{\n  @Override\n  
public void populateDAG(DAG dag, Configuration configuration)\n  {\n    
JMSInput input = dag.addOperator(\nInput\n, new JMSInput());\n    
FileSplitterBase splitter = dag.addOperator(\nSplitter\n, new 
FileSplitterBase());\n    FSSliceReader blockReader = 
dag.addOperator(\nBlockReader\n, new FSSliceReader());\n    ...\n    
dag.addStream(\nfile-info\n, input.output, splitter.input);\n    
dag.addStream(\nblock-metadata\n, splitter.blocksMetadataOutput, 
blockReader.blocksMetadataInput);\n    ...\n  }\n\n  public static class 
JMSInput extends
  AbstractJMSInputOperator\nAbstractFileSplitter.FileInfo\n\n  {\n\n    public 
final transient DefaultOutputPort\nAbstractFileSplitter.FileInfo\n output = new 
DefaultOutputPort\n();\n\n    @Override\n    protected 
AbstractFileSplitter.FileInfo convert(Message message) throws JMSException\n    
{\n      //assuming the message is a text message containing the absolute path 
of the file.\n      return new AbstractFileSplitter.FileInfo(null, 
((TextMessage)message).getText());\n    }\n\n    @Override\n    protected void 
emit(AbstractFileSplitter.FileInfo payload)\n    {\n      
output.emit(payload);\n    }\n  }\n}\n\n\n\n\nPorts\n\n\nDeclares an input port 
on which it receives tuples from the upstream operator. Output ports are 
inherited from AbstractFileSplitter.\n\n\n\n\ninput: non optional port on which 
tuples of type \nFileInfo\n are 
received.\n\n\n\n\nConfiguration\n\n\n\n\nfile\n: path of the file from which 
the filesystem is inferred. FileSplitter creates an instance of \norg.apache.h
 adoop.fs.FileSystem\n which is why this path is needed.  
\n\n\n\n\nFileSystem.newInstance(new Path(file).toUri(), new 
Configuration());\n\n\n\n\nThe fs instance is then used to fetch the default 
block size and \norg.apache.hadoop.fs.FileStatus\n for each file 
path.\n\n\nFileSplitterInput\n\n\nThis is an input operator that discovers 
files itself. The scanning of the directories for new files is asynchronous 
which is handled by \nTimeBasedDirectoryScanner\n. The function of 
TimeBasedDirectoryScanner is to periodically scan specified directories and 
find files which were newly added or modified. The interaction between the 
operator and the scanner is depicted in the diagram below.\n\n\n\n\nExample 
application\n\n\nThis is a simple sub-dag that demonstrates how 
FileSplitterInput can be plugged into an application.\n\n\n\n\nSplitter is the 
input operator here that sends block metadata to the downstream 
BlockReader.\n\n\n  @Override\n  public void populateDAG(DAG dag, Configuration 
confi
 guration)\n  {\n    FileSplitterInput input = dag.addOperator(\nInput\n, new 
FileSplitterInput());\n    FSSliceReader reader = dag.addOperator(\nBlock 
Reader\n, new FSSliceReader());\n    ...\n    dag.addStream(\nblock-metadata\n, 
input.blocksMetadataOutput, reader.blocksMetadataInput);\n    ...\n  
}\n\n\n\n\n\nPorts\n\n\nSince it is an input operator there are no input ports 
and output ports are inherited from 
AbstractFileSplitter.\n\n\nConfiguration\n\n\n\n\nscanner\n: the component that 
scans directories asynchronously. It is of type 
\ncom.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner\n. The basic 
implementation of TimeBasedDirectoryScanner can be customized by users.  
\n\n\n\n\na. \nfiles\n: comma separated list of directories to scan.  \n\n\nb. 
\nrecursive\n: flag that controls whether the directories should be scanned 
recursively.  \n\n\nc. \nscanIntervalMillis\n: interval specified in 
milliseconds after which another scan iteration is triggered.  \n\n\nd. \nfil
 ePatternRegularExp\n: regular expression for accepted file names.  \n\n\ne. 
\ntrigger\n: a flag that triggers a scan iteration instantly. If the scanner 
thread is idling then it will initiate a scan immediately otherwise if a scan 
is in progress, then the new iteration will be triggered immediately after the 
completion of current one.\n2. \nidempotentStorageManager\n: by default 
FileSplitterInput is idempotent. \nIdempotency ensures that the operator will 
process the same set of files/blocks in a window if it has seen that window 
previously, i.e., before a failure. For example, let's say the operator 
completed window 10 and failed somewhere between window 11. If the operator 
gets restored at window 10 then it will process the same file/block again in 
window 10 which it did in the previous run before the failure. Idempotency is 
important but comes with higher cost because at the end of each window the 
operator needs to persist some state with respect to that window. Therefore, if 
one
  doesn't care about idempotency then they can set this property to be an 
instance of 
\ncom.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager\n.\n\n\nHandling
 of split records\n\n\nSplitting of files to create tasks for downstream 
operator needs to be a simple operation that doesn't consume a lot of resources 
and is fast. This is why the file splitter doesn't open files to read. The 
downside of that is if the file contains records then a record may split across 
adjacent blocks. Handling of this is left to the downstream operator.\n\n\nWe 
have created Block readers in Apex-malhar library that handle line splits 
efficiently. The 2 line readers- \nAbstractFSLineReader\n and 
\nAbstractFSReadAheadLineReader\n can be found here 
\nAbstractFSBlockReader\n.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#file-splitter", 
+            "text": "This is a simple operator whose main function is to split 
a file virtually and create metadata describing the files and the splits.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#why-is-it-needed", 
+            "text": "It is a common operation to read a file and parse it. 
This operation can be parallelized by having multiple partitions of such 
operators and each partition operating on different files. However, at times 
when a file is large then a single partition reading it can become a 
bottleneck.\nIn these cases, throughput can be increased if instances of the 
partitioned operator can read and parse non-overlapping sets of file blocks. 
This is where file splitter comes in handy. It creates metadata of blocks of 
file which serves as tasks handed out to downstream operator partitions. \nThe 
downstream partitions can read/parse the block without the need of interacting 
with other partitions.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/file_splitter/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstractfilesplitter", 
+            "text": "The abstract implementation defines the logic of 
processing  FileInfo . This comprises the following tasks -      building  
FileMetadata  per file and emitting it. This metadata contains the file 
information such as filepath, no. of blocks in it, length of the file, all the 
block ids, etc.    creating  BlockMetadataIterator  from  FileMetadata . The 
iterator lazy-loads the block metadata when needed. We use an iterator because 
the no. of blocks in a file can be huge if the block size is small and loading 
all of them at once in memory may cause out of memory errors.    retrieving  
BlockMetadata.FileBlockMetadata  from the block metadata iterator and emitting 
it. The FileBlockMetadata contains the block id, start offset of the block, 
length of file in the block, etc. The number of block metadata emitted per 
window are controlled by  blocksThreshold  setting which by default is 1.      
The main utility method that performs all the above tasks is the  process()  met
 hod. Concrete implementations can invoke this method whenever they have data 
to process.", 
+            "title": "AbstractFileSplitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports", 
+            "text": "Declares only output ports on which file metadata and 
block metadata are emitted.   filesMetadataOutput: metadata for each file is 
emitted on this port.   blocksMetadataOutput: metadata for each block is 
emitted on this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstract-methods", 
+            "text": "FileInfo getFileInfo() : called from within the  
process()  and provides the next file to process.    long getDefaultBlockSize() 
: provides the block size which is used when user hasn't configured the size.   
 FileStatus getFileStatus(Path path) : provides the  
org.apache.hadoop.fs.FileStatus  instance for a path.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration", 
+            "text": "blockSize : size of a block.  blocksThreshold : threshold 
on the number of blocks emitted by file splitter every window. This setting is 
used for throttling the work for downstream operators.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#filesplitterbase", 
+            "text": "Simple operator that receives tuples of type  FileInfo  
on its  input  port.  FileInfo  contains the information (currently just the 
file path) about the file which this operator uses to create file metadata and 
block metadata.", 
+            "title": "FileSplitterBase"
+        }, 
+        {
+            "location": "/operators/file_splitter/#example-application", 
+            "text": "This is a simple sub-dag that demonstrates how 
FileSplitterBase can be plugged into an application.  The upstream operator 
emits tuples of type  FileInfo  on its output port which is connected to 
splitter input port. The downstream receives tuples of type  
BlockMetadata.FileBlockMetadata  from the splitter's block metadata output 
port.  public class ApplicationWithBaseSplitter implements 
StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, 
Configuration configuration)\n  {\n    JMSInput input = dag.addOperator( Input 
, new JMSInput());\n    FileSplitterBase splitter = dag.addOperator( Splitter , 
new FileSplitterBase());\n    FSSliceReader blockReader = dag.addOperator( 
BlockReader , new FSSliceReader());\n    ...\n    dag.addStream( file-info , 
input.output, splitter.input);\n    da

<TRUNCATED>

Reply via email to