http://git-wip-us.apache.org/repos/asf/apex-site/blob/a0eec8b5/docs/malhar-3.5/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.5/mkdocs/search_index.json 
b/docs/malhar-3.5/mkdocs/search_index.json
new file mode 100644
index 0000000..524fd0a
--- /dev/null
+++ b/docs/malhar-3.5/mkdocs/search_index.json
@@ -0,0 +1,524 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open 
source operator and codec library that can be used with the \nApache Apex\n 
platform to build real-time streaming applications.  Enabling users to extract 
value quickly, Malhar operators help get data in, analyze it in real-time, and 
get data out of Hadoop.  In addition to the operators, the library contains a 
number of demos applications, demonstrating operator features and 
capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor 
most streaming platforms, connectors are afterthoughts and often end up being 
simple \u2018bolt-ons\u2019 to the platform. As a result they often cause 
performance issues or data loss when put through failure scenarios and 
scalability requirements. Malhar operators do not face these issues as they 
were designed to be integral parts of Apex. Hence, they have following core 
streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar 
operators where appli
 cable have fault tolerance built in. They use the checkpoint capability 
provided by the framework to ensure that there is no data loss under ANY 
failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where 
applicable provide out of the box support for ALL three processing guarantees 
\u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user 
to write any additional code.  Some operators, like MQTT operator, deal with 
source systems that can not track processed data and hence need the operators 
to keep track of the data.  Malhar has support for a generic operator that uses 
alternate storage like HDFS to facilitate this.  Finally for databases that 
support transactions or support any sort of atomic batch operations Malhar 
operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n 
\u2013 Based on changing business conditions you often have to tweak several 
parameters used by the operators in your streaming application without incur
 ring any application downtime. You can also change properties of a Malhar 
operator at runtime without having to bring down the application.\n\n\nEase of 
extensibility\n \u2013 Malhar operators are based on templates that are easy to 
extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input 
data stream often needs to be partitioned based on the contents of the stream. 
Also for operators that ingest data from external systems partitioning needs to 
be done based on the capabilities of the external system.  For example with 
Kafka, the operator can automatically scale up or down based on the changes in 
the number of Kafka partitions.\n\n\n\n\nOperator Library 
Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various 
sub categories of input and output operators. Input operators also have a 
corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming 
analytics use cases require the data to be stored in HDFS or perhaps S3 if the 
applica
 tion is running in AWS.  Users often need to re-run their streaming analytical 
applications against historical data or consume data from upstream processes 
that are perhaps writing to some NFS share.  Apex supports input \n output 
operators for HDFS, S3, NFS \n Local Files.  There are also File Splitter and 
Block Reader operators, which can accelecate processing of large files by 
splitting and paralellizing the work across non-overlapping sets of file 
blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use cases 
require some reference data lookups to enrich, tag or filter streaming data. 
There is also a need to save results of the streaming analytical computation to 
a database so an operational dashboard can see them. Apex supports a JDBC 
operator so you can read/write data from any JDBC compliant RDBMS like Oracle, 
MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL key-value pair 
databases like Cassandra \n HBase are a common part of streaming analytics 
applicati
 on architectures to lookup reference data or store results.  Malhar has 
operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and 
CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are 
the workhorses of messaging infrastructure in most enterprises.  Malhar has a 
robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, 
and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an 
operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n 
Caching platforms\n - Some streaming use cases need instantaneous access to 
shared state across the application. Caching platforms and in-memory databases 
serve this purpose really well. To support these use cases, Malhar has 
operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an 
operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - 
Malhar provides connectors that can communicate in HTTP, RSS, Socket, 
WebSocket, FTP, and MQT
 T.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data 
formats that a streaming application developer might need to parse. Often there 
are existing parsers available for these that can be directly plugged into an 
Apache Apex application. For example in the Telco space, a Java based CDR 
parser can be directly plugged into Apache Apex operator. To further simplify 
development experience, Malhar also provides some operators for parsing common 
formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, 
syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs 
processing to clean, filter, tag, summarize, etc. The goal of Malhar is to 
enable the application developer to focus on WHAT needs to be done to the 
stream to get it in the right format and not worry about the HOW.  Malhar has 
several operators to perform the common stream manipulation actions like \u2013 
GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Out
 er join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important 
promises of a streaming analytics platform like Apache Apex is the ability to 
do analytics in real-time. However delivering on the promise becomes really 
difficult when the platform does not provide out of the box operators to 
support variety of common compute functions as the user then has to worry about 
making these scalable, fault tolerant, stateful, etc.  Malhar takes this 
responsibility away from the application developer by providing a variety of 
out of the box computational operators.\n\n\nBelow is just a snapshot of the 
compute operators available in Malhar\n\n\n\n\nStatistics and math - Various 
mathematical and statistical computations over application defined time 
windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, 
TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages 
Support\n\n\nMigrating to a new platform often requires re-use of the existing 
code that would be diffic
 ult or time-consuming to re-write.  With this in mind, Malhar supports 
invocation of code written in other languages by wrapping them in one of the 
library operators, and allows execution of software written 
in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#apache-apex-malhar", 
+            "text": "Apache Apex Malhar is an open source operator and codec 
library that can be used with the  Apache Apex  platform to build real-time 
streaming applications.  Enabling users to extract value quickly, Malhar 
operators help get data in, analyze it in real-time, and get data out of 
Hadoop.  In addition to the operators, the library contains a number of demos 
applications, demonstrating operator features and capabilities.", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#capabilities-common-across-malhar-operators", 
+            "text": "For most streaming platforms, connectors are 
afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the 
platform. As a result they often cause performance issues or data loss when put 
through failure scenarios and scalability requirements. Malhar operators do not 
face these issues as they were designed to be integral parts of Apex. Hence, 
they have following core streaming runtime capabilities   Fault tolerance  
\u2013 Malhar operators where applicable have fault tolerance built in. They 
use the checkpoint capability provided by the framework to ensure that there is 
no data loss under ANY failure scenario.  Processing guarantees  \u2013 Malhar 
operators where applicable provide out of the box support for ALL three 
processing guarantees \u2013 exactly once, at-least once, and at-most once 
WITHOUT requiring the user to write any additional code.  Some operators, like 
MQTT operator, deal with source systems that can not track processed data and 
hence n
 eed the operators to keep track of the data.  Malhar has support for a generic 
operator that uses alternate storage like HDFS to facilitate this.  Finally for 
databases that support transactions or support any sort of atomic batch 
operations Malhar operators can do exactly once down to the tuple level.  
Dynamic updates  \u2013 Based on changing business conditions you often have to 
tweak several parameters used by the operators in your streaming application 
without incurring any application downtime. You can also change properties of a 
Malhar operator at runtime without having to bring down the application.  Ease 
of extensibility  \u2013 Malhar operators are based on templates that are easy 
to extend.  Partitioning support  \u2013 In streaming applications the input 
data stream often needs to be partitioned based on the contents of the stream. 
Also for operators that ingest data from external systems partitioning needs to 
be done based on the capabilities of the external system.  Fo
 r example with Kafka, the operator can automatically scale up or down based on 
the changes in the number of Kafka partitions.", 
+            "title": "Capabilities common across Malhar operators"
+        }, 
+        {
+            "location": "/#operator-library-overview", 
+            "text": "", 
+            "title": "Operator Library Overview"
+        }, 
+        {
+            "location": "/#inputoutput-connectors", 
+            "text": "Below is a summary of the various sub categories of input 
and output operators. Input operators also have a corresponding output operator 
  File Systems  \u2013 Most streaming analytics use cases require the data to 
be stored in HDFS or perhaps S3 if the application is running in AWS.  Users 
often need to re-run their streaming analytical applications against historical 
data or consume data from upstream processes that are perhaps writing to some 
NFS share.  Apex supports input   output operators for HDFS, S3, NFS   Local 
Files.  There are also File Splitter and Block Reader operators, which can 
accelecate processing of large files by splitting and paralellizing the work 
across non-overlapping sets of file blocks.  Relational Databases  \u2013 Most 
stream processing use cases require some reference data lookups to enrich, tag 
or filter streaming data. There is also a need to save results of the streaming 
analytical computation to a database so an operational das
 hboard can see them. Apex supports a JDBC operator so you can read/write data 
from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.  NoSQL Databases 
 \u2013 NoSQL key-value pair databases like Cassandra   HBase are a common part 
of streaming analytics application architectures to lookup reference data or 
store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, 
MongoDB, and CouchDB.  Messaging Systems  \u2013 Kafka, JMS, and similar 
systems are the workhorses of messaging infrastructure in most enterprises.  
Malhar has a robust, industry-tested set of operators to read and write Kafka, 
JMS, ZeroMQ, and RabbitMQ messages.  Notification Systems  \u2013 Malhar 
includes an operator for sending notifications via SMTP.  In-memory Databases   
Caching platforms  - Some streaming use cases need instantaneous access to 
shared state across the application. Caching platforms and in-memory databases 
serve this purpose really well. To support these use cases, Malhar
  has operators for memcached and Redis.  Social Media  - Malhar includes an 
operator to connect to the popular Twitter stream fire hose.  Protocols  - 
Malhar provides connectors that can communicate in HTTP, RSS, Socket, 
WebSocket, FTP, and MQTT.", 
+            "title": "Input/output connectors"
+        }, 
+        {
+            "location": "/#parsers", 
+            "text": "There are many industry vertical specific data formats 
that a streaming application developer might need to parse. Often there are 
existing parsers available for these that can be directly plugged into an 
Apache Apex application. For example in the Telco space, a Java based CDR 
parser can be directly plugged into Apache Apex operator. To further simplify 
development experience, Malhar also provides some operators for parsing common 
formats like XML (DOM   SAX), JSON (flat map converter), Apache log files, 
syslog, etc.", 
+            "title": "Parsers"
+        }, 
+        {
+            "location": "/#stream-manipulation", 
+            "text": "Streaming data inevitably needs processing to clean, 
filter, tag, summarize, etc. The goal of Malhar is to enable the application 
developer to focus on WHAT needs to be done to the stream to get it in the 
right format and not worry about the HOW.  Malhar has several operators to 
perform the common stream manipulation actions like \u2013 GroupBy, Join, 
Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, 
Update etc.", 
+            "title": "Stream manipulation"
+        }, 
+        {
+            "location": "/#compute", 
+            "text": "One of the most important promises of a streaming 
analytics platform like Apache Apex is the ability to do analytics in 
real-time. However delivering on the promise becomes really difficult when the 
platform does not provide out of the box operators to support variety of common 
compute functions as the user then has to worry about making these scalable, 
fault tolerant, stateful, etc.  Malhar takes this responsibility away from the 
application developer by providing a variety of out of the box computational 
operators.  Below is just a snapshot of the compute operators available in 
Malhar   Statistics and math - Various mathematical and statistical 
computations over application defined time windows.  Filtering and pattern 
matching  Sorting, maps, frequency, TopN, BottomN  Random data generators", 
+            "title": "Compute"
+        }, 
+        {
+            "location": "/#languages-support", 
+            "text": "Migrating to a new platform often requires re-use of the 
existing code that would be difficult or time-consuming to re-write.  With this 
in mind, Malhar supports invocation of code written in other languages by 
wrapping them in one of the library operators, and allows execution of software 
written in:   JavaScript  Python  R  Ruby", 
+            "title": "Languages Support"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/", 
+            "text": "KAFKA INPUT OPERATOR\n\n\nIntroduction: About Kafka Input 
Operator\n\n\nThis is an input operator that consumes data from Kafka messaging 
system for further processing in Apex. Kafka Input Operator is an 
fault-tolerant and scalable Malhar Operator.\n\n\nWhy is it needed ?\n\n\nKafka 
is a pull-based and distributed publish subscribe messaging system, topics are 
partitioned and replicated across\nnodes. Kafka input operator is needed when 
you want to read data from multiple\npartitions of a Kafka topic in parallel in 
an Apex application.\n\n\nAbstractKafkaInputOperator\n\n\nThis is the abstract 
implementation that serves as base class for consuming messages from Kafka 
messaging system. This class doesn\u2019t have any 
ports.\n\n\n\n\nConfiguration 
Parameters\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nDescription\n\n\n\n\n\n\nmaxTuplesPerWindow\n\n\nControls
 the maximum number of messages emitted in each streaming window from this 
operator. Minimum value is 1. Default
  value = MAX_VALUE \n\n\n\n\n\n\nidempotentStorageManager\n\n\nThis is an 
instance of IdempotentStorageManager. Idempotency ensures that the operator 
will process the same set of messages in a window before and after a failure. 
For example, let's say the operator completed window 10 and failed somewhere 
between window 11. If the operator gets restored at window 10 then it will 
process the same messages again in window 10 which it did in the previous run 
before the failure. Idempotency is important but comes with higher cost because 
at the end of each window the operator needs to persist some state with respect 
to that window. Default Value = 
com.datatorrent.lib.io.IdempotentStorageManager.\nNoopIdempotentStorageManager\n\n\n\n\n\n\nstrategy\n\n\nOperator
 supports two types of partitioning strategies, ONE_TO_ONE and 
ONE_TO_MANY.\n\n\nONE_TO_ONE: If this is enabled, the AppMaster creates one 
input operator instance per Kafka topic partition. So the number of Kafka topic 
partitions equ
 als the number of operator instances.\n\n\nONE_TO_MANY: The AppMaster creates 
K = min(initialPartitionCount, N) Kafka input operator instances where N is the 
number of Kafka topic partitions. If K is less than N, the remaining topic 
partitions are assigned to the K operator instances in round-robin fashion. If 
K is less than initialPartitionCount, the AppMaster creates one input operator 
instance per Kafka topic partition. For example, if initialPartitionCount = 5 
and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input 
operator instances.\nDefault Value = 
ONE_TO_ONE\n\n\n\n\n\n\nmsgRateUpperBound\n\n\nMaximum messages upper bound. 
Operator repartitions when the \nmsgProcessedPS\n exceeds this bound. 
\nmsgProcessedPS\n is the average number of messages processed per second by 
this operator.\n\n\n\n\n\n\nbyteRateUpperBound\n\n\nMaximum bytes upper bound. 
Operator repartitions when the \nbytesPS\n exceeds this bound. \nbytesPS\n is 
the average number of bytes process
 ed per second by this operator.\n\n\n\n\n\n\n\n\noffsetManager\n\n\nThis is an 
optional parameter that is useful when the application restarts or start at 
specific offsets (offsets are explained 
below)\n\n\n\n\n\n\nrepartitionInterval\n\n\nInterval specified in 
milliseconds. This value specifies the minimum time required between two 
repartition actions. Default Value = 30 
Seconds\n\n\n\n\n\n\nrepartitionCheckInterval\n\n\nInterval specified in 
milliseconds. This value specifies the minimum interval between two offset 
updates. Default Value = 5 Seconds\n\n\n\n\n\n\ninitialPartitionCount\n\n\nWhen 
the ONE_TO_MANY partition strategy is enabled, this value indicates the number 
of Kafka input operator instances. Default Value = 
1\n\n\n\n\n\n\nconsumer\n\n\nThis is an instance of 
com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of 
SimpleKafkaConsumer.\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nvoid 
emitTuple(Message message): Abstract method that emits tuples\nextracted f
 rom Kafka message.\n\n\nKafkaConsumer\n\n\nThis is an abstract implementation 
of Kafka consumer. It sends the fetch\nrequests to the leading brokers of Kafka 
partitions. For each request,\nit receives the set of messages and stores them 
into the buffer which is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which 
extends\nKafkaConsumer and serves the functionality of Simple Consumer API 
and\nHighLevelKafkaConsumer which extends KafkaConsumer and \u00a0serves 
the\nfunctionality of High Level Consumer API.\n\n\nPre-requisites\n\n\nThis 
operator referred the Kafka Consumer API of version\n0.8.1.1. So, this operator 
will work with any 0.8.x and 0.7.x version of Apache Kafka.\n\n\nConfiguration 
Parameters\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nzookeeper\n\n\nString\n\n\n\n\nSpecifies
 the zookeeper quorum of Kafka clusters that you want to consume messages from. 
zookeeper \u00a0is a string in the form of hostname1:port1,hostname2:por
 t2,hostname3:port3 \u00a0where hostname1,hostname2,hostname3 are hosts and 
port1,port2,port3 are ports of zookeeper server. \u00a0If the topic name is the 
same across the Kafka clusters and want to consume data from these clusters, 
then configure the zookeeper as follows: 
c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6\n\n\nwhere\n\n\nc1,c2,c3 
indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and 
p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in 
case of single cluster\n\n\n\n\n\n\ncacheSize\n\n\nint\n\n\n1024\n\n\nMaximum 
of buffered messages hold in 
memory.\n\n\n\n\n\n\ntopic\n\n\nString\n\n\ndefault_topic\n\n\nIndicates the 
name of the 
topic.\n\n\n\n\n\n\ninitialOffset\n\n\nString\n\n\nlatest\n\n\nIndicates the 
type of offset i.e, \u201cearliest or latest\u201d. If initialOffset is 
\u201clatest\u201d, then the operator consumes messages from latest point of 
Kafka queue. If initialOffset is \u201cearliest\u201d, then the opera
 tor consumes messages starting from message queue. This can be overridden by 
OffsetManager.\n\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\n\n\nvoid 
commitOffset(): Commit the offsets at checkpoint.\n\n\nMap \nKafkaPartition, 
Long\n getCurrentOffsets(): Return the current\n    offset 
status.\n\n\nresetPartitionsAndOffset(Set \nKafkaPartition\n partitionIds,\n    
Map \nKafkaPartition, Long\n startOffset): Reset the partitions with\n    
parittionIds and offsets with startOffset.\n\n\n\n\nConfiguration 
Parameters\u00a0for 
SimpleKafkaConsumer\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nbufferSize\n\n\nint\n\n\n1
 MB\n\n\nSpecifies the maximum total size of messages for each fetch 
request.\n\n\n\n\n\n\nmetadataRefreshInterval\n\n\nint\n\n\n30 
Seconds\n\n\nInterval in between refresh the metadata change(broker change) in 
milliseconds. Enabling metadata refresh guarantees an automatic reconnect when 
a new broker is elected as the host. A value of
  -1 disables this 
feature.\n\n\n\n\n\n\nmetadataRefreshRetryLimit\n\n\nint\n\n\n-1\n\n\nSpecifies 
the maximum brokers' metadata refresh retry limit. -1 means unlimited 
retry.\n\n\n\n\n\n\n\n\n\nOffsetManager\n\n\nThis is an interface for offset 
management and is useful when consuming data\nfrom specified offsets. Updates 
the offsets for all the Kafka partitions\nperiodically. Below is the code 
snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\n\n\npublic interface 
OffsetManager\n{\n  public Map\nKafkaPartition, Long\n loadInitialOffsets();\n  
public void updateOffsets(Map\nKafkaPartition, Long\n 
offsetsOfPartitions);\n}\n\n\n\n\nAbstract Methods\n\n\nMap \nKafkaPartition, 
Long\n loadInitialOffsets(): Specifies the initial offset for consuming 
messages; called at the activation stage.\n\n\nupdateOffsets(Map 
\nKafkaPartition, Long\n offsetsOfPartitions): \u00a0This\nmethod is called at 
every repartitionCheckInterval to update offsets.\n\n\nPartitioning\n\n\nThe 
logical instance
  of the KafkaInputOperator acts as the Partitioner\nas well as a 
StatsListener. This is because the\nAbstractKafkaInputOperator implements both 
the\ncom.datatorrent.api.Partitioner and 
com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of 
definePartitions(...) and\nprocessStats(...) which makes it 
auto-scalable.\n\n\nResponse processStats(BatchedOperatorStats stats)\n\n\nThe 
application master invokes this method on the logical instance with\nthe stats 
(tuplesProcessedPS, bytesPS, etc.) of each partition.\nRe-partitioning happens 
based on whether any new Kafka partitions added for\nthe topic or bytesPS and 
msgPS cross their respective upper bounds.\n\n\nDefinePartitions\n\n\nBased on 
the repartitionRequired field of the Response object which is\nreturned by 
processStats(...) method, the application master invokes\ndefinePartitions(...) 
on the logical instance which is also the\npartitioner instance. Dynamic 
partition can be disabled by setting the\nparamete
 r repartitionInterval value to a negative 
value.\n\n\nAbstractSinglePortKafkaInputOperator\n\n\nThis class extends 
AbstractKafkaInputOperator and having single output\nport, will emit the 
messages through this port.\n\n\nPorts\n\n\noutputPort \nT\n: Tuples extracted 
from Kafka messages are emitted through\nthis port.\n\n\nAbstract 
Methods\n\n\nT getTuple(Message msg) : Converts the Kafka message to 
tuple.\n\n\nConcrete Classes\n\n\n\n\n\n\nKafkaSinglePortStringInputOperator 
:\nThis class extends AbstractSinglePortKafkaInputOperator and getTuple() 
method extracts string from Kafka 
message.\n\n\n\n\n\n\nKafkaSinglePortByteArrayInputOperator:\nThis class 
extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts 
byte array from Kafka message.\n\n\n\n\n\n\nApplication Example\n\n\nThis 
section builds an Apex application using Kafka input operator.\nBelow is the 
code snippet:\n\n\n@ApplicationAnnotation(name = \nKafkaApp\n)\npublic class 
ExampleKafkaApplication implement
 s StreamingApplication\n{\n@Override\npublic void populateDAG(DAG dag, 
Configuration entries)\n{\n  KafkaSinglePortByteArrayInputOperator input =  
dag.addOperator(\nMessageReader\n, new 
KafkaSinglePortByteArrayInputOperator());\n\n  ConsoleOutputOperator output = 
dag.addOperator(\nOutput\n, new ConsoleOutputOperator());\n\n  
dag.addStream(\nMessageData\n, input.outputPort, 
output.input);\n}\n}\n\n\n\n\nBelow is the configuration for \u201ctest\u201d 
Kafka topic name and\n\u201clocalhost:2181\u201d is the zookeeper 
forum:\n\n\nproperty\n\n\nname\ndt.operator.MessageReader.prop.topic\n/name\n\n\nvalue\ntest\n/value\n\n\n/property\n\n\n\nproperty\n\n\nname\ndt.operator.KafkaInputOperator.prop.zookeeper\n/nam\n\n\nvalue\nlocalhost:2181\n/value\n\n\n/property",
 
+            "title": "Kafka Input"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafka-input-operator", 
+            "text": "", 
+            "title": "KAFKA INPUT OPERATOR"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#introduction-about-kafka-input-operator", 
+            "text": "This is an input operator that consumes data from Kafka 
messaging system for further processing in Apex. Kafka Input Operator is an 
fault-tolerant and scalable Malhar Operator.", 
+            "title": "Introduction: About Kafka Input Operator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#why-is-it-needed", 
+            "text": "Kafka is a pull-based and distributed publish subscribe 
messaging system, topics are partitioned and replicated across\nnodes. Kafka 
input operator is needed when you want to read data from multiple\npartitions 
of a Kafka topic in parallel in an Apex application.", 
+            "title": "Why is it needed ?"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#abstractkafkainputoperator", 
+            "text": "This is the abstract implementation that serves as base 
class for consuming messages from Kafka messaging system. This class 
doesn\u2019t have any ports.", 
+            "title": "AbstractKafkaInputOperator"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-parameters", 
+            "text": "Parameter  Description    maxTuplesPerWindow  Controls 
the maximum number of messages emitted in each streaming window from this 
operator. Minimum value is 1. Default value = MAX_VALUE     
idempotentStorageManager  This is an instance of IdempotentStorageManager. 
Idempotency ensures that the operator will process the same set of messages in 
a window before and after a failure. For example, let's say the operator 
completed window 10 and failed somewhere between window 11. If the operator 
gets restored at window 10 then it will process the same messages again in 
window 10 which it did in the previous run before the failure. Idempotency is 
important but comes with higher cost because at the end of each window the 
operator needs to persist some state with respect to that window. Default Value 
= com.datatorrent.lib.io.IdempotentStorageManager. NoopIdempotentStorageManager 
   strategy  Operator supports two types of partitioning strategies, ONE_TO_ONE 
and ONE_TO_MANY.
   ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator 
instance per Kafka topic partition. So the number of Kafka topic partitions 
equals the number of operator instances.  ONE_TO_MANY: The AppMaster creates K 
= min(initialPartitionCount, N) Kafka input operator instances where N is the 
number of Kafka topic partitions. If K is less than N, the remaining topic 
partitions are assigned to the K operator instances in round-robin fashion. If 
K is less than initialPartitionCount, the AppMaster creates one input operator 
instance per Kafka topic partition. For example, if initialPartitionCount = 5 
and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input 
operator instances.\nDefault Value = ONE_TO_ONE    msgRateUpperBound  Maximum 
messages upper bound. Operator repartitions when the  msgProcessedPS  exceeds 
this bound.  msgProcessedPS  is the average number of messages processed per 
second by this operator.    byteRateUpperBound  Maximum bytes upper bo
 und. Operator repartitions when the  bytesPS  exceeds this bound.  bytesPS  is 
the average number of bytes processed per second by this operator.     
offsetManager  This is an optional parameter that is useful when the 
application restarts or start at specific offsets (offsets are explained below) 
   repartitionInterval  Interval specified in milliseconds. This value 
specifies the minimum time required between two repartition actions. Default 
Value = 30 Seconds    repartitionCheckInterval  Interval specified in 
milliseconds. This value specifies the minimum interval between two offset 
updates. Default Value = 5 Seconds    initialPartitionCount  When the 
ONE_TO_MANY partition strategy is enabled, this value indicates the number of 
Kafka input operator instances. Default Value = 1    consumer  This is an 
instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = 
Instance of SimpleKafkaConsumer.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods", 
+            "text": "void emitTuple(Message message): Abstract method that 
emits tuples\nextracted from Kafka message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafkaconsumer", 
+            "text": "This is an abstract implementation of Kafka consumer. It 
sends the fetch\nrequests to the leading brokers of Kafka partitions. For each 
request,\nit receives the set of messages and stores them into the buffer which 
is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which extends\nKafkaConsumer 
and serves the functionality of Simple Consumer API and\nHighLevelKafkaConsumer 
which extends KafkaConsumer and \u00a0serves the\nfunctionality of High Level 
Consumer API.", 
+            "title": "KafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#pre-requisites", 
+            "text": "This operator referred the Kafka Consumer API of 
version\n0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version 
of Apache Kafka.", 
+            "title": "Pre-requisites"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-parameters_1", 
+            "text": "Parameter  Type  Default  Description    zookeeper  
String   Specifies the zookeeper quorum of Kafka clusters that you want to 
consume messages from. zookeeper \u00a0is a string in the form of 
hostname1:port1,hostname2:port2,hostname3:port3 \u00a0where 
hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of 
zookeeper server. \u00a0If the topic name is the same across the Kafka clusters 
and want to consume data from these clusters, then configure the zookeeper as 
follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6  where  c1,c2,c3 
indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and 
p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in 
case of single cluster    cacheSize  int  1024  Maximum of buffered messages 
hold in memory.    topic  String  default_topic  Indicates the name of the 
topic.    initialOffset  String  latest  Indicates the type of offset i.e, 
\u201cearliest or latest\u201
 d. If initialOffset is \u201clatest\u201d, then the operator consumes messages 
from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, 
then the operator consumes messages starting from message queue. This can be 
overridden by OffsetManager.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_1", 
+            "text": "void commitOffset(): Commit the offsets at checkpoint.  
Map  KafkaPartition, Long  getCurrentOffsets(): Return the current\n    offset 
status.  resetPartitionsAndOffset(Set  KafkaPartition  partitionIds,\n    Map  
KafkaPartition, Long  startOffset): Reset the partitions with\n    parittionIds 
and offsets with startOffset.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#configuration-parameters-for-simplekafkaconsumer",
 
+            "text": "Parameter  Type  Default  Description    bufferSize  int  
1 MB  Specifies the maximum total size of messages for each fetch request.    
metadataRefreshInterval  int  30 Seconds  Interval in between refresh the 
metadata change(broker change) in milliseconds. Enabling metadata refresh 
guarantees an automatic reconnect when a new broker is elected as the host. A 
value of -1 disables this feature.    metadataRefreshRetryLimit  int  -1  
Specifies the maximum brokers' metadata refresh retry limit. -1 means unlimited 
retry.", 
+            "title": "Configuration Parameters\u00a0for SimpleKafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#offsetmanager", 
+            "text": "This is an interface for offset management and is useful 
when consuming data\nfrom specified offsets. Updates the offsets for all the 
Kafka partitions\nperiodically. Below is the code 
snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0  public interface 
OffsetManager\n{\n  public Map KafkaPartition, Long  loadInitialOffsets();\n  
public void updateOffsets(Map KafkaPartition, Long  offsetsOfPartitions);\n}", 
+            "title": "OffsetManager"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_2", 
+            "text": "Map  KafkaPartition, Long  loadInitialOffsets(): 
Specifies the initial offset for consuming messages; called at the activation 
stage.  updateOffsets(Map  KafkaPartition, Long  offsetsOfPartitions): 
\u00a0This\nmethod is called at every repartitionCheckInterval to update 
offsets.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#partitioning", 
+            "text": "The logical instance of the KafkaInputOperator acts as 
the Partitioner\nas well as a StatsListener. This is because 
the\nAbstractKafkaInputOperator implements both 
the\ncom.datatorrent.api.Partitioner and 
com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of 
definePartitions(...) and\nprocessStats(...) which makes it auto-scalable.", 
+            "title": "Partitioning"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#response-processstatsbatchedoperatorstats-stats",
 
+            "text": "The application master invokes this method on the logical 
instance with\nthe stats (tuplesProcessedPS, bytesPS, etc.) of each 
partition.\nRe-partitioning happens based on whether any new Kafka partitions 
added for\nthe topic or bytesPS and msgPS cross their respective upper 
bounds.", 
+            "title": "Response processStats(BatchedOperatorStats stats)"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#definepartitions", 
+            "text": "Based on the repartitionRequired field of the Response 
object which is\nreturned by processStats(...) method, the application master 
invokes\ndefinePartitions(...) on the logical instance which is also 
the\npartitioner instance. Dynamic partition can be disabled by setting 
the\nparameter repartitionInterval value to a negative value.", 
+            "title": "DefinePartitions"
+        }, 
+        {
+            "location": 
"/operators/kafkaInputOperator/#abstractsingleportkafkainputoperator", 
+            "text": "This class extends AbstractKafkaInputOperator and having 
single output\nport, will emit the messages through this port.", 
+            "title": "AbstractSinglePortKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports", 
+            "text": "outputPort  T : Tuples extracted from Kafka messages are 
emitted through\nthis port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_3", 
+            "text": "T getTuple(Message msg) : Converts the Kafka message to 
tuple.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#concrete-classes", 
+            "text": "KafkaSinglePortStringInputOperator :\nThis class extends 
AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from 
Kafka message.    KafkaSinglePortByteArrayInputOperator:\nThis class extends 
AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array 
from Kafka message.", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#application-example", 
+            "text": "This section builds an Apex application using Kafka input 
operator.\nBelow is the code snippet:  @ApplicationAnnotation(name =  KafkaApp 
)\npublic class ExampleKafkaApplication implements 
StreamingApplication\n{\n@Override\npublic void populateDAG(DAG dag, 
Configuration entries)\n{\n  KafkaSinglePortByteArrayInputOperator input =  
dag.addOperator( MessageReader , new 
KafkaSinglePortByteArrayInputOperator());\n\n  ConsoleOutputOperator output = 
dag.addOperator( Output , new ConsoleOutputOperator());\n\n  dag.addStream( 
MessageData , input.outputPort, output.input);\n}\n}  Below is the 
configuration for \u201ctest\u201d Kafka topic name 
and\n\u201clocalhost:2181\u201d is the zookeeper forum:  property  name 
dt.operator.MessageReader.prop.topic /name  value test /value  /property  
property  name dt.operator.KafkaInputOperator.prop.zookeeper /nam  value 
localhost:2181 /value  /property", 
+            "title": "Application Example"
+        }, 
+        {
+            "location": "/operators/file_splitter/", 
+            "text": "File Splitter\n\n\nThis is a simple operator whose main 
function is to split a file virtually and create metadata describing the files 
and the splits. \n\n\nWhy is it needed?\n\n\nIt is a common operation to read a 
file and parse it. This operation can be parallelized by having multiple 
partitions of such operators and each partition operating on different files. 
However, at times when a file is large then a single partition reading it can 
become a bottleneck.\nIn these cases, throughput can be increased if instances 
of the partitioned operator can read and parse non-overlapping sets of file 
blocks. This is where file splitter comes in handy. It creates metadata of 
blocks of file which serves as tasks handed out to downstream operator 
partitions. \nThe downstream partitions can read/parse the block without the 
need of interacting with other partitions.\n\n\nClass 
Diagram\n\n\n\n\nAbstractFileSplitter\n\n\nThe abstract implementation defines 
the logic of processi
 ng \nFileInfo\n. This comprises the following tasks -  \n\n\n\n\n\n\nbuilding 
\nFileMetadata\n per file and emitting it. This metadata contains the file 
information such as filepath, no. of blocks in it, length of the file, all the 
block ids, etc.\n\n\n\n\n\n\ncreating \nBlockMetadataIterator\n from 
\nFileMetadata\n. The iterator lazy-loads the block metadata when needed. We 
use an iterator because the no. of blocks in a file can be huge if the block 
size is small and loading all of them at once in memory may cause out of memory 
errors.\n\n\n\n\n\n\nretrieving \nBlockMetadata.FileBlockMetadata\n from the 
block metadata iterator and emitting it. The FileBlockMetadata contains the 
block id, start offset of the block, length of file in the block, etc. The 
number of block metadata emitted per window are controlled by 
\nblocksThreshold\n setting which by default is 1.  \n\n\n\n\n\n\nThe main 
utility method that performs all the above tasks is the \nprocess()\n method. 
Concrete implementa
 tions can invoke this method whenever they have data to 
process.\n\n\nPorts\n\n\nDeclares only output ports on which file metadata and 
block metadata are emitted.\n\n\n\n\nfilesMetadataOutput: metadata for each 
file is emitted on this port. \n\n\nblocksMetadataOutput: metadata for each 
block is emitted on this port. \n\n\n\n\nprocess()\n method\n\n\nWhen process() 
is invoked, any pending blocks from the current file are emitted on the 
'blocksMetadataOutput' port. If the threshold for blocks per window is still 
not met then a new input file is processed - corresponding metadata is emitted 
on 'filesMetadataOutput' and more of its blocks are emitted. This operation is 
repeated until the \nblocksThreshold\n is reached or there are no more new 
files.\n\n\n  protected void process()\n  {\n    if (blockMetadataIterator != 
null \n blockCount \n blocksThreshold) {\n      emitBlockMetadata();\n    }\n\n 
   FileInfo fileInfo;\n    while (blockCount \n blocksThreshold \n (fileInfo = 
getFileInfo
 ()) != null) {\n      if (!processFileInfo(fileInfo)) {\n        break;\n      
}\n    }\n  }\n\n\n\n\nAbstract methods\n\n\n\n\n\n\nFileInfo getFileInfo()\n: 
called from within the \nprocess()\n and provides the next file to 
process.\n\n\n\n\n\n\nlong getDefaultBlockSize()\n: provides the block size 
which is used when user hasn't configured the size.\n\n\n\n\n\n\nFileStatus 
getFileStatus(Path path)\n: provides the \norg.apache.hadoop.fs.FileStatus\n 
instance for a path.   \n\n\n\n\n\n\nConfiguration\n\n\n\n\nblockSize\n: size 
of a block.\n\n\nblocksThreshold\n: threshold on the number of blocks emitted 
by file splitter every window. This setting is used for throttling the work for 
downstream operators.\n\n\n\n\nFileSplitterBase\n\n\nSimple operator that 
receives tuples of type \nFileInfo\n on its \ninput\n port. \nFileInfo\n 
contains the information (currently just the file path) about the file which 
this operator uses to create file metadata and block metadata.\n\n\nExample 
applica
 tion\n\n\nThis is a simple sub-dag that demonstrates how FileSplitterBase can 
be plugged into an application.\n\n\n\nThe upstream operator emits tuples of 
type \nFileInfo\n on its output port which is connected to splitter input port. 
The downstream receives tuples of type \nBlockMetadata.FileBlockMetadata\n from 
the splitter's block metadata output port.\n\n\npublic class 
ApplicationWithBaseSplitter implements StreamingApplication\n{\n  @Override\n  
public void populateDAG(DAG dag, Configuration configuration)\n  {\n    
JMSInput input = dag.addOperator(\nInput\n, new JMSInput());\n    
FileSplitterBase splitter = dag.addOperator(\nSplitter\n, new 
FileSplitterBase());\n    FSSliceReader blockReader = 
dag.addOperator(\nBlockReader\n, new FSSliceReader());\n    ...\n    
dag.addStream(\nfile-info\n, input.output, splitter.input);\n    
dag.addStream(\nblock-metadata\n, splitter.blocksMetadataOutput, 
blockReader.blocksMetadataInput);\n    ...\n  }\n\n  public static class 
JMSInput extends
  AbstractJMSInputOperator\nAbstractFileSplitter.FileInfo\n\n  {\n\n    public 
final transient DefaultOutputPort\nAbstractFileSplitter.FileInfo\n output = new 
DefaultOutputPort\n();\n\n    @Override\n    protected 
AbstractFileSplitter.FileInfo convert(Message message) throws JMSException\n    
{\n      //assuming the message is a text message containing the absolute path 
of the file.\n      return new AbstractFileSplitter.FileInfo(null, 
((TextMessage)message).getText());\n    }\n\n    @Override\n    protected void 
emit(AbstractFileSplitter.FileInfo payload)\n    {\n      
output.emit(payload);\n    }\n  }\n}\n\n\n\n\nPorts\n\n\nDeclares an input port 
on which it receives tuples from the upstream operator. Output ports are 
inherited from AbstractFileSplitter.\n\n\n\n\ninput: non optional port on which 
tuples of type \nFileInfo\n are 
received.\n\n\n\n\nConfiguration\n\n\n\n\nfile\n: path of the file from which 
the filesystem is inferred. FileSplitter creates an instance of \norg.apache.h
 adoop.fs.FileSystem\n which is why this path is needed.  
\n\n\n\n\nFileSystem.newInstance(new Path(file).toUri(), new 
Configuration());\n\n\n\n\nThe fs instance is then used to fetch the default 
block size and \norg.apache.hadoop.fs.FileStatus\n for each file 
path.\n\n\nFileSplitterInput\n\n\nThis is an input operator that discovers 
files itself. The scanning of the directories for new files is asynchronous 
which is handled by \nTimeBasedDirectoryScanner\n. The function of 
TimeBasedDirectoryScanner is to periodically scan specified directories and 
find files which were newly added or modified. The interaction between the 
operator and the scanner is depicted in the diagram below.\n\n\n\n\nExample 
application\n\n\nThis is a simple sub-dag that demonstrates how 
FileSplitterInput can be plugged into an application.\n\n\n\n\nSplitter is the 
input operator here that sends block metadata to the downstream 
BlockReader.\n\n\n  @Override\n  public void populateDAG(DAG dag, Configuration 
confi
 guration)\n  {\n    FileSplitterInput input = dag.addOperator(\nInput\n, new 
FileSplitterInput());\n    FSSliceReader reader = dag.addOperator(\nBlock 
Reader\n, new FSSliceReader());\n    ...\n    dag.addStream(\nblock-metadata\n, 
input.blocksMetadataOutput, reader.blocksMetadataInput);\n    ...\n  
}\n\n\n\n\n\nPorts\n\n\nSince it is an input operator there are no input ports 
and output ports are inherited from 
AbstractFileSplitter.\n\n\nConfiguration\n\n\n\n\nscanner\n: the component that 
scans directories asynchronously. It is of type 
\ncom.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner\n. The basic 
implementation of TimeBasedDirectoryScanner can be customized by users.  
\n\n\n\n\na. \nfiles\n: comma separated list of directories to scan.  \n\n\nb. 
\nrecursive\n: flag that controls whether the directories should be scanned 
recursively.  \n\n\nc. \nscanIntervalMillis\n: interval specified in 
milliseconds after which another scan iteration is triggered.  \n\n\nd. \nfil
 ePatternRegularExp\n: regular expression for accepted file names.  \n\n\ne. 
\ntrigger\n: a flag that triggers a scan iteration instantly. If the scanner 
thread is idling then it will initiate a scan immediately otherwise if a scan 
is in progress, then the new iteration will be triggered immediately after the 
completion of current one.\n2. \nidempotentStorageManager\n: by default 
FileSplitterInput is idempotent. \nIdempotency ensures that the operator will 
process the same set of files/blocks in a window if it has seen that window 
previously, i.e., before a failure. For example, let's say the operator 
completed window 10 and failed somewhere between window 11. If the operator 
gets restored at window 10 then it will process the same file/block again in 
window 10 which it did in the previous run before the failure. Idempotency is 
important but comes with higher cost because at the end of each window the 
operator needs to persist some state with respect to that window. Therefore, if 
one
  doesn't care about idempotency then they can set this property to be an 
instance of 
\ncom.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager\n.\n\n\nHandling
 of split records\n\n\nSplitting of files to create tasks for downstream 
operator needs to be a simple operation that doesn't consume a lot of resources 
and is fast. This is why the file splitter doesn't open files to read. The 
downside of that is if the file contains records then a record may split across 
adjacent blocks. Handling of this is left to the downstream operator.\n\n\nWe 
have created Block readers in Apex-malhar library that handle line splits 
efficiently. The 2 line readers- \nAbstractFSLineReader\n and 
\nAbstractFSReadAheadLineReader\n can be found here 
\nAbstractFSBlockReader\n.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#file-splitter", 
+            "text": "This is a simple operator whose main function is to split 
a file virtually and create metadata describing the files and the splits.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#why-is-it-needed", 
+            "text": "It is a common operation to read a file and parse it. 
This operation can be parallelized by having multiple partitions of such 
operators and each partition operating on different files. However, at times 
when a file is large then a single partition reading it can become a 
bottleneck.\nIn these cases, throughput can be increased if instances of the 
partitioned operator can read and parse non-overlapping sets of file blocks. 
This is where file splitter comes in handy. It creates metadata of blocks of 
file which serves as tasks handed out to downstream operator partitions. \nThe 
downstream partitions can read/parse the block without the need of interacting 
with other partitions.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/file_splitter/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstractfilesplitter", 
+            "text": "The abstract implementation defines the logic of 
processing  FileInfo . This comprises the following tasks -      building  
FileMetadata  per file and emitting it. This metadata contains the file 
information such as filepath, no. of blocks in it, length of the file, all the 
block ids, etc.    creating  BlockMetadataIterator  from  FileMetadata . The 
iterator lazy-loads the block metadata when needed. We use an iterator because 
the no. of blocks in a file can be huge if the block size is small and loading 
all of them at once in memory may cause out of memory errors.    retrieving  
BlockMetadata.FileBlockMetadata  from the block metadata iterator and emitting 
it. The FileBlockMetadata contains the block id, start offset of the block, 
length of file in the block, etc. The number of block metadata emitted per 
window are controlled by  blocksThreshold  setting which by default is 1.      
The main utility method that performs all the above tasks is the  process()  met
 hod. Concrete implementations can invoke this method whenever they have data 
to process.", 
+            "title": "AbstractFileSplitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports", 
+            "text": "Declares only output ports on which file metadata and 
block metadata are emitted.   filesMetadataOutput: metadata for each file is 
emitted on this port.   blocksMetadataOutput: metadata for each block is 
emitted on this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstract-methods", 
+            "text": "FileInfo getFileInfo() : called from within the  
process()  and provides the next file to process.    long getDefaultBlockSize() 
: provides the block size which is used when user hasn't configured the size.   
 FileStatus getFileStatus(Path path) : provides the  
org.apache.hadoop.fs.FileStatus  instance for a path.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration", 
+            "text": "blockSize : size of a block.  blocksThreshold : threshold 
on the number of blocks emitted by file splitter every window. This setting is 
used for throttling the work for downstream operators.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#filesplitterbase", 
+            "text": "Simple operator that receives tuples of type  FileInfo  
on its  input  port.  FileInfo  contains the information (currently just the 
file path) about the file which this operator uses to create file metadata and 
block metadata.", 
+            "title": "FileSplitterBase"
+        }, 
+        {
+            "location": "/operators/file_splitter/#example-application", 
+            "text": "This is a simple sub-dag that demonstrates how 
FileSplitterBase can be plugged into an application.  The upstream operator 
emits tuples of type  FileInfo  on its output port which is connected to 
splitter input port. The downstream receives tuples of type  
BlockMetadata.FileBlockMetadata  from the splitter's block metadata output 
port.  public class ApplicationWithBaseSplitter implements 
StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, 
Configuration configuration)\n  {\n    JMSInput input = dag.addOperator( Input 
, new JMSInput());\n    FileSplitterBase splitter = dag.addOperator( Splitter , 
new FileSplitterBase());\n    FSSliceReader blockReader = dag.addOperator( 
BlockReader , new FSSliceReader());\n    ...\n    dag.addStream( file-info , 
input.output, splitter.input);\n    dag.addStream( block-metadata , 
splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    ...\n  
}\n\n  public static class JMSInput extends AbstractJMSI
 nputOperator AbstractFileSplitter.FileInfo \n  {\n\n    public final transient 
DefaultOutputPort AbstractFileSplitter.FileInfo  output = new DefaultOutputPort 
();\n\n    @Override\n    protected AbstractFileSplitter.FileInfo 
convert(Message message) throws JMSException\n    {\n      //assuming the 
message is a text message containing the absolute path of the file.\n      
return new AbstractFileSplitter.FileInfo(null, 
((TextMessage)message).getText());\n    }\n\n    @Override\n    protected void 
emit(AbstractFileSplitter.FileInfo payload)\n    {\n      
output.emit(payload);\n    }\n  }\n}", 
+            "title": "Example application"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports_1", 
+            "text": "Declares an input port on which it receives tuples from 
the upstream operator. Output ports are inherited from AbstractFileSplitter.   
input: non optional port on which tuples of type  FileInfo  are received.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration_1", 
+            "text": "file : path of the file from which the filesystem is 
inferred. FileSplitter creates an instance of  org.apache.hadoop.fs.FileSystem  
which is why this path is needed.     FileSystem.newInstance(new 
Path(file).toUri(), new Configuration());  The fs instance is then used to 
fetch the default block size and  org.apache.hadoop.fs.FileStatus  for each 
file path.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#filesplitterinput", 
+            "text": "This is an input operator that discovers files itself. 
The scanning of the directories for new files is asynchronous which is handled 
by  TimeBasedDirectoryScanner . The function of TimeBasedDirectoryScanner is to 
periodically scan specified directories and find files which were newly added 
or modified. The interaction between the operator and the scanner is depicted 
in the diagram below.", 
+            "title": "FileSplitterInput"
+        }, 
+        {
+            "location": "/operators/file_splitter/#example-application_1", 
+            "text": "This is a simple sub-dag that demonstrates how 
FileSplitterInput can be plugged into an application.   Splitter is the input 
operator here that sends block metadata to the downstream BlockReader.    
@Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  
{\n    FileSplitterInput input = dag.addOperator( Input , new 
FileSplitterInput());\n    FSSliceReader reader = dag.addOperator( Block Reader 
, new FSSliceReader());\n    ...\n    dag.addStream( block-metadata , 
input.blocksMetadataOutput, reader.blocksMetadataInput);\n    ...\n  }", 
+            "title": "Example application"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports_2", 
+            "text": "Since it is an input operator there are no input ports 
and output ports are inherited from AbstractFileSplitter.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration_2", 
+            "text": "scanner : the component that scans directories 
asynchronously. It is of type  
com.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner . The basic 
implementation of TimeBasedDirectoryScanner can be customized by users.     a.  
files : comma separated list of directories to scan.    b.  recursive : flag 
that controls whether the directories should be scanned recursively.    c.  
scanIntervalMillis : interval specified in milliseconds after which another 
scan iteration is triggered.    d.  filePatternRegularExp : regular expression 
for accepted file names.    e.  trigger : a flag that triggers a scan iteration 
instantly. If the scanner thread is idling then it will initiate a scan 
immediately otherwise if a scan is in progress, then the new iteration will be 
triggered immediately after the completion of current one.\n2.  
idempotentStorageManager : by default FileSplitterInput is idempotent. 
\nIdempotency ensures that the operator will process the same set o
 f files/blocks in a window if it has seen that window previously, i.e., before 
a failure. For example, let's say the operator completed window 10 and failed 
somewhere between window 11. If the operator gets restored at window 10 then it 
will process the same file/block again in window 10 which it did in the 
previous run before the failure. Idempotency is important but comes with higher 
cost because at the end of each window the operator needs to persist some state 
with respect to that window. Therefore, if one doesn't care about idempotency 
then they can set this property to be an instance of  
com.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager 
.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#handling-of-split-records", 
+            "text": "Splitting of files to create tasks for downstream 
operator needs to be a simple operation that doesn't consume a lot of resources 
and is fast. This is why the file splitter doesn't open files to read. The 
downside of that is if the file contains records then a record may split across 
adjacent blocks. Handling of this is left to the downstream operator.  We have 
created Block readers in Apex-malhar library that handle line splits 
efficiently. The 2 line readers-  AbstractFSLineReader  and  
AbstractFSReadAheadLineReader  can be found here  AbstractFSBlockReader .", 
+            "title": "Handling of split records"
+        }, 
+        {
+            "location": "/operators/block_reader/", 
+            "text": "Block Reader\n\n\nThis is a scalable operator that reads 
and parses blocks of data sources into records. A data source can be a file or 
a message bus that contains records and a block defines a chunk of data in the 
source by specifying the block offset and the length of the source belonging to 
the block. \n\n\nWhy is it needed?\n\n\nA Block Reader is needed to parallelize 
reading and parsing of a single data source, for example a file. Simple 
parallelism of reading data sources can be achieved by multiple partitions 
reading different source of same type (for files see 
\nAbstractFileInputOperator\n) but Block Reader partitions can read blocks of 
same source in parallel and parse them for records ensuring that no record is 
duplicated or missed.\n\n\nClass Diagram\n\n\n\n\nAbstractBlockReader\n\n\nThis 
is the abstract implementation that serves as the base for different types of 
data sources. It defines how a block metadata is processed. The flow diagram 
below desc
 ribes the processing of a block 
metadata.\n\n\n\n\nPorts\n\n\n\n\n\n\nblocksMetadataInput: input port on which 
block metadata are received.\n\n\n\n\n\n\nblocksMetadataOutput: output port on 
which block metadata are emitted if the port is connected. This port is useful 
when a downstream operator that receives records from block reader may also be 
interested to know the details of the corresponding 
blocks.\n\n\n\n\n\n\nmessages: output port on which tuples of type 
\ncom.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord\n are emitted. 
This class encapsulates a \nrecord\n and the \nblockId\n of the corresponding 
block.\n\n\n\n\n\n\nreaderContext\n\n\nThis is one of the most important fields 
in the block reader. It is of type 
\ncom.datatorrent.lib.io.block.ReaderContext\n and is responsible for fetching 
bytes that make a record. It also lets the reader know how many total bytes 
were consumed which may not be equal to the total bytes in a record because 
consumed bytes also include
  bytes for the record delimiter which may not be a part of the actual 
record.\n\n\nOnce the reader creates an input stream for the block (or uses the 
previous opened stream if the current block is successor of the previous block) 
it initializes the reader context by invoking 
\nreaderContext.initialize(stream, blockMetadata, consecutiveBlock);\n. 
Initialize method is where any implementation of \nReaderContext\n can perform 
all the operations which have to be executed just before reading the block or 
create states which are used during the lifetime of reading the 
block.\n\n\nOnce the initialization is done, \nreaderContext.next()\n is called 
repeatedly until it returns \nnull\n. It is left to the \nReaderContext\n 
implementations to decide when a block is completely processed. In cases when a 
record is split across adjacent blocks, reader context may decide to read ahead 
of the current block boundary to completely fetch the split record (examples- 
\nLineReaderContext\n and \nReadAhea
 dLineReaderContext\n). In other cases when there isn't a possibility of split 
record (example- \nFixedBytesReaderContext\n), it returns \nnull\n immediately 
when the block boundary is reached. The return type of \nreaderContext.next()\n 
is of type \ncom.datatorrent.lib.io.block.ReaderContext.Entity\n which is just 
a wrapper for a \nbyte[]\n that represents the record and total bytes used in 
fetching the record.\n\n\nAbstract methods\n\n\n\n\n\n\nSTREAM setupStream(B 
block)\n: creating a stream for a block is dependent on the type of source 
which is not known to AbstractBlockReader. Sub-classes which deal with a 
specific data source provide this implementation.\n\n\n\n\n\n\nR 
convertToRecord(byte[] bytes)\n: this converts the array of bytes into the 
actual instance of record type.\n\n\n\n\n\n\nAuto-scalability\n\n\nBlock reader 
can auto-scale, that is, depending on the backlog (total number of all the 
blocks which are waiting in the \nblocksMetadataInput\n port queue of all 
partition
 s) it can create more partitions or reduce them. Details are discussed in the 
last section which covers the \npartitioner and 
stats-listener\n.\n\n\nConfiguration\n\n\n\n\nmaxReaders\n: when auto-scaling 
is enabled, this controls the maximum number of block reader partitions that 
can be created.\n\n\nminReaders\n: when auto-scaling is enabled, this controls 
the minimum number of block reader partitions that should always 
exist.\n\n\ncollectStats\n: this enables or disables auto-scaling. When it is 
set to \ntrue\n the stats (number of blocks in the queue) are collected and 
this triggers partitioning; otherwise auto-scaling is 
disabled.\n\n\nintervalMillis\n: when auto-scaling is enabled, this specifies 
the interval at which the reader will trigger the logic of computing the 
backlog and auto-scale.\n\n\n\n\n AbstractFSBlockReader\n\n\nThis abstract 
implementation deals with files. Different types of file systems that are 
implementations of \norg.apache.hadoop.fs.FileSystem\n are suppo
 rted. The user can override \ngetFSInstance()\n method to create an instance 
of a specific \nFileSystem\n. By default, filesystem instance is created from 
the filesytem URI that comes from the default hadoop 
configuration.\n\n\nprotected FileSystem getFSInstance() throws 
IOException\n{\n  return FileSystem.newInstance(configuration);\n}\n\n\n\n\nIt 
uses this filesystem instance to setup a stream of type 
\norg.apache.hadoop.fs.FSDataInputStream\n to read the 
block.\n\n\n@Override\nprotected FSDataInputStream 
setupStream(BlockMetadata.FileBlockMetadata block) throws IOException\n{\n  
return fs.open(new Path(block.getFilePath()));\n}\n\n\n\n\nAll the ports and 
configurations are derived from the super class. It doesn't provide an 
implementation of \nconvertToRecord(byte[] bytes)\n method which is delegated 
to concrete sub-classes.\n\n\nExample Application\n\n\nThis simple dag 
demonstrates how any concrete implementation of \nAbstractFSBlockReader\n can 
be plugged into an application. \
 n\n\n\n\nIn the above application, file splitter creates block metadata for 
files which are sent to block reader. Partitions of the block reader parses the 
file blocks for records which are filtered, transformed and then persisted to a 
file (created per block). Therefore block reader is parallel partitioned with 
the 2 downstream operators - filter/converter and record output operator. The 
code which implements this dag is below.\n\n\npublic class ExampleApplication 
implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG 
dag, Configuration configuration)\n  {\n    FileSplitterInput input = 
dag.addOperator(\nFile-splitter\n, new FileSplitterInput());\n    //any 
concrete implementation of AbstractFSBlockReader based on the use-case can be 
added here.\n    LineReader blockReader = dag.addOperator(\nBlock-reader\n, new 
LineReader());\n    Filter filter = dag.addOperator(\nFilter\n, new 
Filter());\n    RecordOutputOperator recordOutputOperator = dag.addOperator(\nRe
 cord-writer\n, new RecordOutputOperator());\n\n    dag.addStream(\nfile-block 
metadata\n, input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    
dag.addStream(\nrecords\n, blockReader.messages, filter.input);\n    
dag.addStream(\nfiltered-records\n, filter.output, 
recordOutputOperator.input);\n  }\n\n  /**\n   * Concrete implementation of 
{@link AbstractFSBlockReader} for which a record is a line in the file.\n   
*/\n  public static class LineReader extends 
AbstractFSBlockReader.AbstractFSReadAheadLineReader\nString\n\n  {\n\n    
@Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      
return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line 
starting with a '.' as invalid. Emits the valid records.\n   */\n  public 
static class Filter extends BaseOperator\n  {\n    public final transient 
DefaultOutputPort\nAbstractBlockReader.ReaderRecord\nString\n output = new 
DefaultOutputPort\n();\n    public final transient DefaultInputPort\nAbstractB
 lockReader.ReaderRecord\nString\n input = new 
DefaultInputPort\nAbstractBlockReader.ReaderRecord\nString\n()\n    {\n      
@Override\n      public void process(AbstractBlockReader.ReaderRecord\nString\n 
stringRecord)\n      {\n        //filter records and transform\n        //if 
the string starts with a '.' ignore the string.\n        if 
(!StringUtils.startsWith(stringRecord.getRecord(), \n.\n)) {\n          
output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * 
Persists the valid records to corresponding block files.\n   */\n  public 
static class RecordOutputOperator extends 
AbstractFileOutputOperator\nAbstractBlockReader.ReaderRecord\nString\n\n  {\n   
 @Override\n    protected String 
getFileName(AbstractBlockReader.ReaderRecord\nString\n tuple)\n    {\n      
return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    
protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord\nString\n 
tuple)\n    {\n      return tuple.getRecord().getBytes
 ();\n    }\n  }\n}\n\n\n\n\nConfiguration to parallel partition block reader 
with its downstream operators.\n\n\n  \nproperty\n\n    
\nname\ndt.operator.Filter.port.input.attr.PARTITION_PARALLEL\n/name\n\n    
\nvalue\ntrue\n/value\n\n  \n/property\n\n  \nproperty\n\n    
\nname\ndt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL\n/name\n\n 
   \nvalue\ntrue\n/value\n\n  
\n/property\n\n\n\n\n\nAbstractFSReadAheadLineReader\n\n\nThis extension of 
\nAbstractFSBlockReader\n parses lines from a block and binds the 
\nreaderContext\n field to an instance of 
\nReaderContext.ReadAheadLineReaderContext\n.\n\n\nIt is abstract because it 
doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n since 
the user may want to convert the bytes that make a line into some other type. 
\n\n\nReadAheadLineReaderContext\n\n\nIn order to handle a line split across 
adjacent blocks, ReadAheadLineReaderContext always reads beyond the block 
boundary and ignores the bytes till the first end
 -of-line character of all the blocks except the first block of the file. This 
ensures that no line is missed or incomplete.\n\n\nThis is one of the most 
common ways of handling a split record. It doesn't require any further 
information to decide if a line is complete. However, the cost of this 
consistent way to handle a line split is that it always reads from the next 
block.\n\n\nAbstractFSLineReader\n\n\nSimilar to 
\nAbstractFSReadAheadLineReader\n, even this parses lines from a block. 
However, it binds the \nreaderContext\n field to an instance of 
\nReaderContext.LineReaderContext\n.\n\n\nLineReaderContext\n\n\nThis handles 
the line split differently from \nReadAheadLineReaderContext\n. It doesn't 
always read from the next block. If the end of the last line is aligned with 
the block boundary then it stops processing the block. It does read from the 
next block when the boundaries are not aligned, that is, last line extends 
beyond the block boundary. The result of this is an inconsi
 stency in reading the next block.\n\n\nWhen the boundary of the last line of 
the previous block was aligned with its block, then the first line of the 
current block is a valid line. However, in the other case the bytes from the 
block start offset to the first end-of-line character should be ignored. 
Therefore, this means that any record formed by this reader context has to be 
validated. For example, if the lines are of fixed size then size of each record 
can be validated or if each line begins with a special field then that 
knowledge can be used to check if a record is complete.\n\n\nIf the validations 
of completeness fails for a line then \nconvertToRecord(byte[] bytes)\n should 
return null.\n\n\nFSSliceReader\n\n\nA concrete extension of 
\nAbstractFSBlockReader\n that reads fixed-size \nbyte[]\n from a block and 
emits the byte array wrapped in 
\ncom.datatorrent.netlet.util.Slice\n.\n\n\nThis operator binds the 
\nreaderContext\n to an instance of \nReaderContext.FixedBytesReaderCon
 text\n.\n\n\nFixedBytesReaderContext\n\n\nThis implementation of 
\nReaderContext\n never reads beyond a block boundary which can result in the 
last \nbyte[]\n of a block to be of a shorter length than the rest of the 
records.\n\n\nConfiguration\n\n\nreaderContext.length\n: length of each record. 
By default, this is initialized to the default hdfs block 
size.\n\n\nPartitioner and StatsListener\n\n\nThe logical instance of the block 
reader acts as the Partitioner (unless a custom partitioner is set using the 
operator attribute - \nPARTITIONER\n) as well as a StatsListener. This is 
because the \n\nAbstractBlockReader\n implements both the 
\ncom.datatorrent.api.Partitioner\n and \ncom.datatorrent.api.StatsListener\n 
interfaces and provides an implementation of \ndefinePartitions(...)\n and 
\nprocessStats(...)\n which make it auto-scalable.\n\n\nprocessStats \n\n\nThe 
application master invokes \nResponse processStats(BatchedOperatorStats 
stats)\n method on the logical instance with the 
 stats (\ntuplesProcessedPSMA\n, \ntuplesEmittedPSMA\n, \nlatencyMA\n, etc.) of 
each partition. The data which this operator is interested in is the 
\nqueueSize\n of the input port \nblocksMetadataInput\n.\n\n\nUsually the 
\nqueueSize\n of an input port gives the count of waiting control tuples plus 
data tuples. However, if a stats listener is interested only in the count of 
data tuples then that can be expressed by annotating the class with 
\n@DataQueueSize\n. In this case \nAbstractBlockReader\n itself is the 
\nStatsListener\n which is why it is annotated with 
\n@DataQueueSize\n.\n\n\nThe logical instance caches the queue size per 
partition and at regular intervals (configured by \nintervalMillis\n) sums 
these values to find the total backlog which is then used to decide whether 
re-partitioning is needed. The flow-diagram below describes this 
logic.\n\n\n\n\nThe goal of this logic is to create as many partitions within 
bounds (see \nmaxReaders\n and \nminReaders\n above) to quickly
  reduce this backlog or if the backlog is small then remove any idle 
partitions.\n\n\ndefinePartitions\n\n\nBased on the \nrepartitionRequired\n 
field of the \nResponse\n object which is returned by \nprocessStats\n method, 
the application master invokes 
\n\n\nCollection\nPartition\nAbstractBlockReader\n...\n 
definePartitions(Collection\nPartition\nAbstractBlockReader\n...\n partitions, 
PartitioningContext context)\n\n\n\n\non the logical instance which is also the 
partitioner instance. The implementation calculates the difference between 
required partitions and the existing count of partitions. If this difference is 
negative, then equivalent number of partitions are removed otherwise new 
partitions are created. \n\n\nPlease note auto-scaling can be disabled by 
setting \ncollectStats\n to \nfalse\n. If the use-case requires only static 
partitioning, then that can be achieved by setting \nStatelessPartitioner\n as 
the operator attribute- \nPARTITIONER\n on the block reader.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#block-reader", 
+            "text": "This is a scalable operator that reads and parses blocks 
of data sources into records. A data source can be a file or a message bus that 
contains records and a block defines a chunk of data in the source by 
specifying the block offset and the length of the source belonging to the 
block.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#why-is-it-needed", 
+            "text": "A Block Reader is needed to parallelize reading and 
parsing of a single data source, for example a file. Simple parallelism of 
reading data sources can be achieved by multiple partitions reading different 
source of same type (for files see  AbstractFileInputOperator ) but Block 
Reader partitions can read blocks of same source in parallel and parse them for 
records ensuring that no record is duplicated or missed.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/block_reader/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractblockreader", 
+            "text": "This is the abstract implementation that serves as the 
base for different types of data sources. It defines how a block metadata is 
processed. The flow diagram below describes the processing of a block 
metadata.", 
+            "title": "AbstractBlockReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#ports", 
+            "text": "blocksMetadataInput: input port on which block metadata 
are received.    blocksMetadataOutput: output port on which block metadata are 
emitted if the port is connected. This port is useful when a downstream 
operator that receives records from block reader may also be interested to know 
the details of the corresponding blocks.    messages: output port on which 
tuples of type  com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord  
are emitted. This class encapsulates a  record  and the  blockId  of the 
corresponding block.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/block_reader/#readercontext", 
+            "text": "This is one of the most important fields in the block 
reader. It is of type  com.datatorrent.lib.io.block.ReaderContext  and is 
responsible for fetching bytes that make a record. It also lets the reader know 
how many total bytes were consumed which may not be equal to the total bytes in 
a record because consumed bytes also include bytes for the record delimiter 
which may not be a part of the actual record.  Once the reader creates an input 
stream for the block (or uses the previous opened stream if the current block 
is successor of the previous block) it initializes the reader context by 
invoking  readerContext.initialize(stream, blockMetadata, consecutiveBlock); . 
Initialize method is where any implementation of  ReaderContext  can perform 
all the operations which have to be executed just before reading the block or 
create states which are used during the lifetime of reading the block.  Once 
the initialization is done,  readerContext.next()  is called repeatedl
 y until it returns  null . It is left to the  ReaderContext  implementations 
to decide when a block is completely processed. In cases when a record is split 
across adjacent blocks, reader context may decide to read ahead of the current 
block boundary to completely fetch the split record (examples-  
LineReaderContext  and  ReadAheadLineReaderContext ). In other cases when there 
isn't a possibility of split record (example-  FixedBytesReaderContext ), it 
returns  null  immediately when the block boundary is reached. The return type 
of  readerContext.next()  is of type  
com.datatorrent.lib.io.block.ReaderContext.Entity  which is just a wrapper for 
a  byte[]  that represents the record and total bytes used in fetching the 
record.", 
+            "title": "readerContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstract-methods", 
+            "text": "STREAM setupStream(B block) : creating a stream for a 
block is dependent on the type of source which is not known to 
AbstractBlockReader. Sub-classes which deal with a specific data source provide 
this implementation.    R convertToRecord(byte[] bytes) : this converts the 
array of bytes into the actual instance of record type.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/block_reader/#auto-scalability", 
+            "text": "Block reader can auto-scale, that is, depending on the 
backlog (total number of all the blocks which are waiting in the  
blocksMetadataInput  port queue of all partitions) it can create more 
partitions or reduce them. Details are discussed in the last section which 
covers the  partitioner and stats-listener .", 
+            "title": "Auto-scalability"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration", 
+            "text": "maxReaders : when auto-scaling is enabled, this controls 
the maximum number of block reader partitions that can be created.  minReaders 
: when auto-scaling is enabled, this controls the minimum number of block 
reader partitions that should always exist.  collectStats : this enables or 
disables auto-scaling. When it is set to  true  the stats (number of blocks in 
the queue) are collected and this triggers partitioning; otherwise auto-scaling 
is disabled.  intervalMillis : when auto-scaling is enabled, this specifies the 
interval at which the reader will trigger the logic of computing the backlog 
and auto-scale.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/block_reader/#example-application", 
+            "text": "This simple dag demonstrates how any concrete 
implementation of  AbstractFSBlockReader  can be plugged into an application.   
 In the above application, file splitter creates block metadata for files which 
are sent to block reader. Partitions of the block reader parses the file blocks 
for records which are filtered, transformed and then persisted to a file 
(created per block). Therefore block reader is parallel partitioned with the 2 
downstream operators - filter/converter and record output operator. The code 
which implements this dag is below.  public class ExampleApplication implements 
StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, 
Configuration configuration)\n  {\n    FileSplitterInput input = 
dag.addOperator( File-splitter , new FileSplitterInput());\n    //any concrete 
implementation of AbstractFSBlockReader based on the use-case can be added 
here.\n    LineReader blockReader = dag.addOperator( Block-reader , new 
LineReader());\n 
    Filter filter = dag.addOperator( Filter , new Filter());\n    
RecordOutputOperator recordOutputOperator = dag.addOperator( Record-writer , 
new RecordOutputOperator());\n\n    dag.addStream( file-block metadata , 
input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    
dag.addStream( records , blockReader.messages, filter.input);\n    
dag.addStream( filtered-records , filter.output, recordOutputOperator.input);\n 
 }\n\n  /**\n   * Concrete implementation of {@link AbstractFSBlockReader} for 
which a record is a line in the file.\n   */\n  public static class LineReader 
extends AbstractFSBlockReader.AbstractFSReadAheadLineReader String \n  {\n\n    
@Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      
return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line 
starting with a '.' as invalid. Emits the valid records.\n   */\n  public 
static class Filter extends BaseOperator\n  {\n    public final transient 
DefaultOutputPort AbstractBlockRea
 der.ReaderRecord String  output = new DefaultOutputPort ();\n    public final 
transient DefaultInputPort AbstractBlockReader.ReaderRecord String  input = new 
DefaultInputPort AbstractBlockReader.ReaderRecord String ()\n    {\n      
@Override\n      public void process(AbstractBlockReader.ReaderRecord String  
stringRecord)\n      {\n        //filter records and transform\n        //if 
the string starts with a '.' ignore the string.\n        if 
(!StringUtils.startsWith(stringRecord.getRecord(),  . )) {\n          
output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * 
Persists the valid records to corresponding block files.\n   */\n  public 
static class RecordOutputOperator extends AbstractFileOutputOperator 
AbstractBlockReader.ReaderRecord String \n  {\n    @Override\n    protected 
String getFileName(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n     
 return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    
protected byte[] getBytesForTup
 le(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n      return 
tuple.getRecord().getBytes();\n    }\n  }\n}  Configuration to parallel 
partition block reader with its downstream operators.     property \n     name 
dt.operator.Filter.port.input.attr.PARTITION_PARALLEL /name \n     value true 
/value \n   /property \n   property \n     name 
dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL /name \n     value 
true /value \n   /property", 
+            "title": "Example Application"
+        }, 
+        {
+            "location": 
"/operators/block_reader/#abstractfsreadaheadlinereader", 
+            "text": "This extension of  AbstractFSBlockReader  parses lines 
from a block and binds the  readerContext  field to an instance of  
ReaderContext.ReadAheadLineReaderContext .  It is abstract because it doesn't 
provide an implementation of  convertToRecord(byte[] bytes)  since the user may 
want to convert the bytes that make a line into some other type.", 
+            "title": "AbstractFSReadAheadLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#readaheadlinereadercontext", 
+            "text": "In order to handle a line split across adjacent blocks, 
ReadAheadLineReaderContext always reads beyond the block boundary and ignores 
the bytes till the first end-of-line character of all the blocks except the 
first block of the file. This ensures that no line is missed or incomplete.  
This is one of the most common ways of handling a split record. It doesn't 
require any further information to decide if a line is complete. However, the 
cost of this consistent way to handle a line split is that it always reads from 
the next block.", 
+            "title": "ReadAheadLineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractfslinereader", 
+            "text": "Similar to  AbstractFSReadAheadLineReader , even this 
parses lines from a block. However, it binds the  readerContext  field to an 
instance of  ReaderContext.LineReaderContext .", 
+            "title": "AbstractFSLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#linereadercontext", 
+            "text": "This handles the line split differently from  
ReadAheadLineReaderContext . It doesn't always read from the next block. If the 
end of the last line is aligned with the block boundary then it stops 
processing the block. It does read from the next block when the boundaries are 
not aligned, that is, last line extends beyond the block boundary. The result 
of this is an inconsistency in reading the next block.  When the boundary of 
the last line of the previous block was aligned with its block, then the first 
line of the current block is a valid line. However, in the other case the bytes 
from the block start offset to the first end-of-line character should be 
ignored. Therefore, this means that any record formed by this reader context 
has to be validated. For example, if the lines are of fixed size then size of 
each record can be validated or if each line begins with a special field then 
that knowledge can be used to check if a record is complete.  If the 
validations 
 of completeness fails for a line then  convertToRecord(byte[] bytes)  should 
return null.", 
+            "title": "LineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#fsslicereader", 
+            "text": "A concrete extension of  AbstractFSBlockReader  that 
reads fixed-size  byte[]  from a block and emits the byte array wrapped in  
com.datatorrent.netlet.util.Slice .  This operator binds the  readerContext  to 
an instance of  ReaderContext.FixedBytesReaderContext .", 
+            "title": "FSSliceReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#fixedbytesreadercontext", 
+            "text": "This implementation of  ReaderContext  never reads beyond 
a block boundary which can result in the last  byte[]  of a block to be of a 
shorter length than the rest of the records.", 
+            "title": "FixedBytesReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration_1", 
+            "text": "readerContext.length : length of each record. By default, 
this is initialized to the default hdfs block size.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": 
"/operators/block_reader/#partitioner-and-statslistener", 
+            "text": "The logical instance of the block reader acts as the 
Partitioner (unless a custom partitioner is set using the operator attribute -  
PARTITIONER ) as well as a StatsListener. This is because the  
AbstractBlockReader  implements both the  com.datatorrent.api.Partitioner  and  
com.datatorrent.api.StatsListener  interfaces and provides an implementation of 
 definePartitions(...)  and  processStats(...)  which make it auto-scalable.", 
+            "title": "Partitioner and StatsListener"
+        }, 
+ 

<TRUNCATED>

Reply via email to