http://git-wip-us.apache.org/repos/asf/apex-site/blob/b25c090d/docs/malhar-3.7/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.7/mkdocs/search_index.json 
b/docs/malhar-3.7/mkdocs/search_index.json
new file mode 100644
index 0000000..5ded716
--- /dev/null
+++ b/docs/malhar-3.7/mkdocs/search_index.json
@@ -0,0 +1,2134 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open 
source operator and codec library that can be used with the \nApache Apex\n 
platform to build real-time streaming applications.  Enabling users to extract 
value quickly, Malhar operators help get data in, analyze it in real-time, and 
get data out of Hadoop.  In addition to the operators, the library contains a 
number of example applications, demonstrating operator features and 
capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor 
most streaming platforms, connectors are afterthoughts and often end up being 
simple \u2018bolt-ons\u2019 to the platform. As a result they often cause 
performance issues or data loss when put through failure scenarios and 
scalability requirements. Malhar operators do not face these issues as they 
were designed to be integral parts of Apex. Hence, they have following core 
streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar 
operators where app
 licable have fault tolerance built in. They use the checkpoint capability 
provided by the framework to ensure that there is no data loss under ANY 
failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where 
applicable provide out of the box support for ALL three processing guarantees 
\u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user 
to write any additional code.  Some operators, like MQTT operator, deal with 
source systems that can not track processed data and hence need the operators 
to keep track of the data.  Malhar has support for a generic operator that uses 
alternate storage like HDFS to facilitate this.  Finally for databases that 
support transactions or support any sort of atomic batch operations Malhar 
operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n 
\u2013 Based on changing business conditions you often have to tweak several 
parameters used by the operators in your streaming application without inc
 urring any application downtime. You can also change properties of a Malhar 
operator at runtime without having to bring down the application.\n\n\nEase of 
extensibility\n \u2013 Malhar operators are based on templates that are easy to 
extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input 
data stream often needs to be partitioned based on the contents of the stream. 
Also for operators that ingest data from external systems partitioning needs to 
be done based on the capabilities of the external system.  For example with 
Kafka, the operator can automatically scale up or down based on the changes in 
the number of Kafka partitions.\n\n\n\n\nOperator Library 
Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various 
sub categories of input and output operators. Input operators also have a 
corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming 
analytics use cases require the data to be stored in HDFS or perhaps S3 if the 
appli
 cation is running in AWS.  Users often need to re-run their streaming 
analytical applications against historical data or consume data from upstream 
processes that are perhaps writing to some NFS share.  Apex supports input \n 
output operators for HDFS, S3, NFS \n Local Files.  There are also File 
Splitter and Block Reader operators, which can accelecate processing of large 
files by splitting and paralellizing the work across non-overlapping sets of 
file blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use 
cases require some reference data lookups to enrich, tag or filter streaming 
data. There is also a need to save results of the streaming analytical 
computation to a database so an operational dashboard can see them. Apex 
supports a JDBC operator so you can read/write data from any JDBC compliant 
RDBMS like Oracle, MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL 
key-value pair databases like Cassandra \n HBase are a common part of streaming 
analytics applica
 tion architectures to lookup reference data or store results.  Malhar has 
operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and 
CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are 
the workhorses of messaging infrastructure in most enterprises.  Malhar has a 
robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, 
and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an 
operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n 
Caching platforms\n - Some streaming use cases need instantaneous access to 
shared state across the application. Caching platforms and in-memory databases 
serve this purpose really well. To support these use cases, Malhar has 
operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an 
operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - 
Malhar provides connectors that can communicate in HTTP, RSS, Socket, 
WebSocket, FTP, and M
 QTT.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data 
formats that a streaming application developer might need to parse. Often there 
are existing parsers available for these that can be directly plugged into an 
Apache Apex application. For example in the Telco space, a Java based CDR 
parser can be directly plugged into Apache Apex operator. To further simplify 
development experience, Malhar also provides some operators for parsing common 
formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, 
syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs 
processing to clean, filter, tag, summarize, etc. The goal of Malhar is to 
enable the application developer to focus on WHAT needs to be done to the 
stream to get it in the right format and not worry about the HOW.  Malhar has 
several operators to perform the common stream manipulation actions like \u2013 
GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, O
 uter join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important 
promises of a streaming analytics platform like Apache Apex is the ability to 
do analytics in real-time. However delivering on the promise becomes really 
difficult when the platform does not provide out of the box operators to 
support variety of common compute functions as the user then has to worry about 
making these scalable, fault tolerant, stateful, etc.  Malhar takes this 
responsibility away from the application developer by providing a variety of 
out of the box computational operators.\n\n\nBelow is just a snapshot of the 
compute operators available in Malhar\n\n\n\n\nStatistics and math - Various 
mathematical and statistical computations over application defined time 
windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, 
TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages 
Support\n\n\nMigrating to a new platform often requires re-use of the existing 
code that would be diff
 icult or time-consuming to re-write.  With this in mind, Malhar supports 
invocation of code written in other languages by wrapping them in one of the 
library operators, and allows execution of software written 
in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#apache-apex-malhar", 
+            "text": "Apache Apex Malhar is an open source operator and codec 
library that can be used with the  Apache Apex  platform to build real-time 
streaming applications.  Enabling users to extract value quickly, Malhar 
operators help get data in, analyze it in real-time, and get data out of 
Hadoop.  In addition to the operators, the library contains a number of example 
applications, demonstrating operator features and capabilities.", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#capabilities-common-across-malhar-operators", 
+            "text": "For most streaming platforms, connectors are 
afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the 
platform. As a result they often cause performance issues or data loss when put 
through failure scenarios and scalability requirements. Malhar operators do not 
face these issues as they were designed to be integral parts of Apex. Hence, 
they have following core streaming runtime capabilities   Fault tolerance  
\u2013 Malhar operators where applicable have fault tolerance built in. They 
use the checkpoint capability provided by the framework to ensure that there is 
no data loss under ANY failure scenario.  Processing guarantees  \u2013 Malhar 
operators where applicable provide out of the box support for ALL three 
processing guarantees \u2013 exactly once, at-least once, and at-most once 
WITHOUT requiring the user to write any additional code.  Some operators, like 
MQTT operator, deal with source systems that can not track processed data and 
hence n
 eed the operators to keep track of the data.  Malhar has support for a generic 
operator that uses alternate storage like HDFS to facilitate this.  Finally for 
databases that support transactions or support any sort of atomic batch 
operations Malhar operators can do exactly once down to the tuple level.  
Dynamic updates  \u2013 Based on changing business conditions you often have to 
tweak several parameters used by the operators in your streaming application 
without incurring any application downtime. You can also change properties of a 
Malhar operator at runtime without having to bring down the application.  Ease 
of extensibility  \u2013 Malhar operators are based on templates that are easy 
to extend.  Partitioning support  \u2013 In streaming applications the input 
data stream often needs to be partitioned based on the contents of the stream. 
Also for operators that ingest data from external systems partitioning needs to 
be done based on the capabilities of the external system.  Fo
 r example with Kafka, the operator can automatically scale up or down based on 
the changes in the number of Kafka partitions.", 
+            "title": "Capabilities common across Malhar operators"
+        }, 
+        {
+            "location": "/#operator-library-overview", 
+            "text": "", 
+            "title": "Operator Library Overview"
+        }, 
+        {
+            "location": "/#inputoutput-connectors", 
+            "text": "Below is a summary of the various sub categories of input 
and output operators. Input operators also have a corresponding output operator 
  File Systems  \u2013 Most streaming analytics use cases require the data to 
be stored in HDFS or perhaps S3 if the application is running in AWS.  Users 
often need to re-run their streaming analytical applications against historical 
data or consume data from upstream processes that are perhaps writing to some 
NFS share.  Apex supports input   output operators for HDFS, S3, NFS   Local 
Files.  There are also File Splitter and Block Reader operators, which can 
accelecate processing of large files by splitting and paralellizing the work 
across non-overlapping sets of file blocks.  Relational Databases  \u2013 Most 
stream processing use cases require some reference data lookups to enrich, tag 
or filter streaming data. There is also a need to save results of the streaming 
analytical computation to a database so an operational das
 hboard can see them. Apex supports a JDBC operator so you can read/write data 
from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.  NoSQL Databases 
 \u2013 NoSQL key-value pair databases like Cassandra   HBase are a common part 
of streaming analytics application architectures to lookup reference data or 
store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, 
MongoDB, and CouchDB.  Messaging Systems  \u2013 Kafka, JMS, and similar 
systems are the workhorses of messaging infrastructure in most enterprises.  
Malhar has a robust, industry-tested set of operators to read and write Kafka, 
JMS, ZeroMQ, and RabbitMQ messages.  Notification Systems  \u2013 Malhar 
includes an operator for sending notifications via SMTP.  In-memory Databases   
Caching platforms  - Some streaming use cases need instantaneous access to 
shared state across the application. Caching platforms and in-memory databases 
serve this purpose really well. To support these use cases, Malhar
  has operators for memcached and Redis.  Social Media  - Malhar includes an 
operator to connect to the popular Twitter stream fire hose.  Protocols  - 
Malhar provides connectors that can communicate in HTTP, RSS, Socket, 
WebSocket, FTP, and MQTT.", 
+            "title": "Input/output connectors"
+        }, 
+        {
+            "location": "/#parsers", 
+            "text": "There are many industry vertical specific data formats 
that a streaming application developer might need to parse. Often there are 
existing parsers available for these that can be directly plugged into an 
Apache Apex application. For example in the Telco space, a Java based CDR 
parser can be directly plugged into Apache Apex operator. To further simplify 
development experience, Malhar also provides some operators for parsing common 
formats like XML (DOM   SAX), JSON (flat map converter), Apache log files, 
syslog, etc.", 
+            "title": "Parsers"
+        }, 
+        {
+            "location": "/#stream-manipulation", 
+            "text": "Streaming data inevitably needs processing to clean, 
filter, tag, summarize, etc. The goal of Malhar is to enable the application 
developer to focus on WHAT needs to be done to the stream to get it in the 
right format and not worry about the HOW.  Malhar has several operators to 
perform the common stream manipulation actions like \u2013 GroupBy, Join, 
Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, 
Update etc.", 
+            "title": "Stream manipulation"
+        }, 
+        {
+            "location": "/#compute", 
+            "text": "One of the most important promises of a streaming 
analytics platform like Apache Apex is the ability to do analytics in 
real-time. However delivering on the promise becomes really difficult when the 
platform does not provide out of the box operators to support variety of common 
compute functions as the user then has to worry about making these scalable, 
fault tolerant, stateful, etc.  Malhar takes this responsibility away from the 
application developer by providing a variety of out of the box computational 
operators.  Below is just a snapshot of the compute operators available in 
Malhar   Statistics and math - Various mathematical and statistical 
computations over application defined time windows.  Filtering and pattern 
matching  Sorting, maps, frequency, TopN, BottomN  Random data generators", 
+            "title": "Compute"
+        }, 
+        {
+            "location": "/#languages-support", 
+            "text": "Migrating to a new platform often requires re-use of the 
existing code that would be difficult or time-consuming to re-write.  With this 
in mind, Malhar supports invocation of code written in other languages by 
wrapping them in one of the library operators, and allows execution of software 
written in:   JavaScript  Python  R  Ruby", 
+            "title": "Languages Support"
+        }, 
+        {
+            "location": "/apis/calcite/", 
+            "text": "Apache Apex is a unified stream and batch processing 
engine that enables application developers to process data at very high 
throughput with low latency. Although the different types of data have 
different processing needs, SQL remains a popular and a generic way for 
processing data. To ensure that existing ETL developers and developers who are 
well versed with Database applications adopt stream processing application 
development with ease, integration of SQL with Apex was needed. Being a popular 
Apache project, Apache Calcite was chosen for this purpose and its integration 
with Apex is described below.\n\n\nApex-Calcite Integration\n\n\nApache Calcite 
is a highly customizable engine for parsing and planning queries on relational 
data from various data sources; it provides storage independent optimization of 
queries and ways to integrate them into other frameworks which would like to 
take advantage and expose SQL capability to their users. For details, please re
 ad at \nApache Calcite Website\n. \n\n\nParticularly in SQL on Apex, Calcite 
processes a query and then creates relational algebra to create processing 
pipelines. These relational algebra processing pipelines are converted to a DAG 
with a set of operators to perform business logic on streaming 
data.\n\n\n\n\nAbove figure explains how SQL query gets converted to Apex 
DAG.\n\n\n\n\nUser specified query is processed by Calcite Query planner; this 
involves parsing and optimizing the query to generate Relation Expression Tree. 
\n\n\nThis Relation Expression Tree is received by Apache Apex\u2019s SQL 
module to finally convert to an Apex DAG having series of 
operators.\n\n\n\n\nOne peculiarity of Calcite queries is that the data source 
and destination need not be RDBMS systems; in the above example, \nFile\n 
refers to a file in the filesystem and \nKafka\n to a Kafka message broker. 
Calcite allows Apex to register table sources and destinations as anything 
which can return a row type resul
 ts. So a \u201cscan\u201d relational expression gets converted to 
\u201cKafkaInputOperator + ParseOperator\u201d, a result of which is series of 
POJOs reflecting a Row Type. Similarly, the \u201cinsert\u201d Relational 
Expression translated to \u201cFormatOperator + 
FileOutputOperator\u201d.\n\n\nFor more details about the integration, click 
\nhere\n.\n\n\nSQL APIs for Apache Apex\n\n\nListed below are the Java APIs 
which can be used by SQL/Apex users to create a DAG in the implementation of 
the \npopulateDAG\n method of the \nStreamingApplication\n 
interface.\n\n\n\n\n\n\n\n\nAPI\n\n\nDescription\n\n\n\n\n\n\n\n\n\n\nSQLExecEnvironment.getEnvironment()\n\n\nCreates
 a new SQL execution 
environment\n\n\n\n\n\n\nSQLExecEnvironment.registerTable(tableName, 
endpointInstance)\n\n\nRegisters a new abstract table with existing 
environment. \nendpointInstance\n is an object of type \nEndpoint\n which 
defines a 
table.\n\n\n\n\n\n\nSQLExecEnvironment.registerFunction(sqlFunctionName, 
holderCl
 ass, staticFunctionName)\n\n\nRegisters a new User Defined Scalar 
function\n\n\n\n\n\n\nSQLExecEnvironment.executeSQL(dag, 
sqlStatement)\n\n\nCreates a DAG for a particular SQL 
statement\n\n\n\n\n\n\n\n\nUsage of above APIs is described in detail in 
following sections.\n\n\nExample 1: Pure Style SQL Application\n\n\nWith Apache 
Calcite Integration, you can use SQL queries across different data sources and 
provide UDFs (User Defined Functions) as per your business logic. This example 
will use a Kafka topic as the source and a HDFS file as the 
destination.\nFollowing application code will be used to explain APIs. Actual 
source code can be found \nhere\n.\n\n\n  public class PureStyleSQLApplication 
implements StreamingApplication\n  {\n    @Override\n    public void 
populateDAG(DAG dag, Configuration conf)\n    {\n       // Create new 
SQLExecEnvironment\n       SQLExecEnvironment sqlEnv = 
SQLExecEnvironment.getEnvironment();\n\n      // This is a string that defines 
a schema and is dis
 cussed in more detail in \nRegistering tables with SQLExecEnvironment\n 
section \n      String inputSchemaString = \n...\n;\n\n      // similar to 
inputSchemaString, we also need to define outputSchemaString\n      String 
outputSchemaString = \n...\n;\n\n       // Register KafkaEnpoint as \nORDERS\n 
table with kafka topic and data format as CSV\n       sqlEnv = 
sqlEnv.registerTable( \n                                    \nORDERS\n, \n      
                              new KafkaEndpoint(\nlocalhost:9090\n, \n          
                                            \ninputTopic\n, \n                  
                                    new CSVMessageFormat(inputSchemaString))\n  
                                );\n\n       // Register FileEndpoint as 
\nSALES\n table with file path and data format as CSV\n       sqlEnv = 
sqlEnv.registerTable( \n                                    \nSALES\n, \n       
                             new FileEndpoint(\n/tmp/output\n, \n               
         
                              \nout.file\n, \n                                  
                   new CSVMessageFormat(outputSchemaString))\n                  
                );\n\n       // Register scalar SQL UDF \n       sqlEnv = 
sqlEnv.registerFunction(\nAPEXCONCAT\n, PureStyleSQLApplication.class, 
\napex_concat_str\n);\n\n       // Converting SQL statement to DAG \n       
String sql = \nINSERT INTO SALES \n                       SELECT STREAM 
ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 
7)) \n                       FROM ORDERS \n                       WHERE ID \n 3 
AND PRODUCT LIKE 'paint%'\n;\n       sqlEnv.executeSQL(dag, sql);\n    }// 
populateDAG finished\n\n    public static String apex_concat_str(String s1, 
String s2)\n    {\n        return s1 + s2;\n    } \n  }\n\n\n\n\nConstructing 
SQLExecEnvironment\n\n\nThe class \nSQLExecEnvironment\n provides a starting 
point and a simple way to define metadata needed for running a SQL statement; a 
ne
 w instance of this class is returned by the \ngetEnvironment\n static method.  
\n\n\n  // Creates SQLExecEnvironment instance by using static method 
getEnvironment\n  SQLExecEnvironment sqlEnv = 
SQLExecEnvironment.getEnvironment();\n\n\n\n\nRegistering tables with 
SQLExecEnvironment\n\n\nNext, we need to register tables which can be used in a 
query. For this purpose, we can use \nregisterTable\n method from 
SQLExecEnvironment.\n\n\n  // Register KafkaEnpoint as \nORDERS\n table with 
kafka topic and data format as CSV\n  sqlEnv = sqlEnv.registerTable( \n         
                     \nORDERS\n, \n                              new 
KafkaEndpoint(\nlocalhost:9090\n, \n                                            
    \ninputTopic\n, \n                                                new 
CSVMessageFormat(inputSchemaString))\n                            );\n\n  // 
Register FileEndpoint as \nSALES\n table with file path and data format as 
CSV\n  sqlEnv = sqlEnv.registerTable( \n              
                 \nSALES\n, \n                              new 
FileEndpoint(\n/tmp/output\n, \n                                               
\nout.file\n, \n                                               new 
CSVMessageFormat(inputSchemaString))\n                            
);\n\n\n\n\n\"registerTable\"\n method takes the name of the table and an 
instance of endpoint as parameters. Endpoint signifies data storage mechanism 
and type of source/destination for the data. These endpoints require different 
types of configurations and possibly data formats. The data format is defined 
using an implementation of the \nMessageFormat\n interface; the 
\nCSVMessageFormat\n implementation can be configured with a schema string as 
follows:\n\n\n{\n  \nseparator\n: \n,\n,\n  \nquoteChar\n: \n\\\n,\n  
\nfields\n: [\n    {\n      \nname\n: \nRowTime\n,\n      \ntype\n: \nDate\n,\n 
     \nconstraints\n: {\n        \nformat\n: \ndd/MM/yyyy hh:mm:ss Z\n\n      
}\n    },\n    {\n      \nname\n: \nid\n,\n
       \ntype\n: \nInteger\n\n    },\n    {\n      \nname\n: \nProduct\n,\n     
 \ntype\n: \nString\n\n    },\n    {\n      \nname\n: \nunits\n,\n      
\ntype\n: \nInteger\n\n    }\n  ]\n}\n\n\n\n\nThe schema string is a JSON 
string defining a separator character, quote character for fields with String 
type and a list of fields where, for each field, its name, type and any 
additional constraints are specified.\n\n\nFollowing data endpoints are 
supported: \n\n\n\n\nKafkaEnpoint\n\n: To define a Kafka Endpoint we need to 
specify the Kafka broker (as host:port), topic name and MessageFormat as seen 
in line 1 in the code above.\n\n\nFileEndpoint\n\n: It needs to be configured 
with the filesystem path, file name and MessageFormat as in line 2 in the code 
above. \n\n\nStreamEndpoint\n \n: This allows us to connect existing operator 
output or input ports to the SQL query as a data source or sink respectively. 
StreamEndpoint needs immediate downstream operator's input port or immediate 
upstr
 eam operator's output port and the field mapping for CSV data or POJO class. 
This will be explained in detail in next \nexample\n.\n\n\n\n\nUsing User 
Defined Functions (UDF) in a SQL query\n\n\nWe can use our own scalar UDF, 
implemented in Java, in a SQL statement for data manipulation but first, we 
need to register the function with the execution environment by using the 
\nregisterFunction\n method.\n\n\n  sqlEnv = 
sqlEnv.registerFunction(\nAPEXCONCAT\n, PureStyleSQLApplication.class, 
\napex_concat_str\n);\n\n\n\n\nIn above code, \nregisterFunction\n takes the 
UDF name to be used in SQL, JAVA class which implements the static method and 
name of that method as parameters. \nThe static method \napex_concat_str\n 
takes two String objects as input parameters from the SQL query.\n\n\n  public 
static String apex_concat_str(String s1, String s2)\n  {\n    return s1 + s2;\n 
 }\n\n\n\n\nThe scalar UDF \"APEXCONCAT\" that was registered above can be used 
in SQL as described below. FLOOR and
  SUBSTRING are standard SQL scalar functions supported by Apache 
Calcite.\n\n\nINSERT INTO SALES \n       SELECT STREAM ROWTIME, FLOOR(ROWTIME 
TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n       FROM ORDERS 
\n       WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n\n\n\n\nTo read about all 
functions and operators supported by Apache Calcite, click 
\nhere\n.\n\n\nExecuting SQL Query\n\n\nFinally to execute the query we need to 
use \nexecuteSQL\n function with a DAG and SQL statement as parameters.\n\n\n  
// Converting SQL statement to DAG \n  String sql = \nINSERT INTO SALES \n      
          SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', 
SUBSTRING(PRODUCT, 6, 7)) \n                FROM ORDERS \n                WHERE 
ID \n 3 AND PRODUCT LIKE 'paint%'\n;\n  sqlEnv.executeSQL(dag, 
sql);\n\n\n\n\nWhen executeSQL method is called, the query goes through various 
phases like conversion to relational algebra, optimization and planning in 
Calcite to generate Rela
 tion Expression Tree. \nThe generated Relation Expression Tree is consumed by 
Apex SQL and converted to a DAG using operators available in Apache Malhar. In 
the above example, the ORDERS and SALES tables will be converted to the 
operators KafkaInputOperator and FileOutputFormatter respectively, paired with 
the CSVParser formatter in both cases.\n\n\nA \nWHERE\n clause is used in this 
query; it defines the desired filter for rows and is converted to a 
\nFilterTransformOperator\n in the DAG. Similarly, the projection defining 
desired columns is converted into another instance of the 
\nFilterTransformOperator\n. The DAG created for this application will look 
like this:\n\n\n\n\n\n\nExample 2: Fusion Style SQL Application\n\n\nAs 
described in Pure Style SQL application, we can use different data sources as 
source and sink while developing Apex Applications with Calcite. This example 
will describe how we can develop Apex application with Apex stream as abstract 
table for SQL query. Actua
 l source code can be found \nhere\n.\n\n\n  // Define Kafka Input Operator for 
reading data from Kafka\n  KafkaSinglePortInputOperator kafkaInput = 
dag.addOperator(\nKafkaInput\n, \n                                              
             KafkaSinglePortInputOperator.class);\n\n  
kafkaInput.setInitialOffset(\nEARLIEST\n);\n\n  // Add CSVParser\n  CsvParser 
csvParser = dag.addOperator(\nCSVParser\n, CsvParser.class);\n  
dag.addStream(\nKafkaToCSV\n, kafkaInput.outputPort, 
csvParser.in);\n\n\n\n\nOnce we define DAG with KafkaInputOperator and 
CSVParser, it can parse data from Kafka topic. Upto this point, this is a 
regular Apex application without SQL. After this, we can register the output of 
CSVParser as a table using \nStreamEndpoint\n to run a SQL statement. This way 
we can develop applications in fusion style where the DAG is part SQL and part 
regular Apex DAG.\n\n\nThe following code will describe how we can define 
StreamEndpoint. \n\n\n  SQLExecEnvironment sqlEnv = sqlEnv.get
 Environment();\n  Map\nString, Class\n fieldMapping = ImmutableMap.\nString, 
Class\nof(\nRowTime\n, Date.class,\n                                            
                     \nid\n, Integer.class,\n                                   
                              \nProduct\n, String.class,\n                      
                                           \nunits\n, Integer.class);\n  sqlEnv 
= sqlEnv.registerTable(\nFROMCSV\n, new StreamEndpoint(csvParser.out, 
fieldMapping));\n\n\n\n\nTo read existing data stream, we need to register it 
as a table with SQL execution environment with the name of the table and 
StreamEndpoint. StreamEndpoint can serve as input table or output table in SQL. 
For input table configuration we need to initialise StreamEndpoint with 
immediate upstream operator's output port and fieldMapping or POJO class for 
input tuple(as shown above). For output table configuration, we need to 
initialise StreamEndpoint with immediate downstream operator's input port and 
 fieldMapping or POJO class for output tuple. Once we register StreamEndpoint 
as a table with a name in SQL Execution Environment, it can be used as a table 
in SQL statement similar to other endpoints.\n\n\nWhen executeSQL method is 
called, the specified SQL is converted to DAG as described in the previous 
section. Both examples read CSV data from Kafka. But in the pure style SQL 
example the \nKafkaInputOperator\n and \nCSVParser\n in the DAG are created 
implicitly by the use of the KafkaEndpoint usage while in the fusion style 
example, they are explicitly defined as part of the DAG which is then extended 
with other operators as shown in the image below. \n\n\n\n\nFor all 
Apex-Calcite integration examples, click \nhere\n. \n\n\nOngoing 
efforts\n\n\nApache Apex-Calcite integration provides support for basic queries 
and efforts are underway to extend support for aggregations, sorting and other 
features using Tumbling, Hopping and Session Windows.\nSupport for JSON, XML 
and JDBC endpoin
 t are also planned. The goal of this integration is to make developing a 
streaming application using SQL easy so that SQL Developers don't have to write 
any java code at all.", 
+            "title": "SQL"
+        }, 
+        {
+            "location": "/apis/calcite/#apex-calcite-integration", 
+            "text": "Apache Calcite is a highly customizable engine for 
parsing and planning queries on relational data from various data sources; it 
provides storage independent optimization of queries and ways to integrate them 
into other frameworks which would like to take advantage and expose SQL 
capability to their users. For details, please read at  Apache Calcite Website 
.   Particularly in SQL on Apex, Calcite processes a query and then creates 
relational algebra to create processing pipelines. These relational algebra 
processing pipelines are converted to a DAG with a set of operators to perform 
business logic on streaming data.   Above figure explains how SQL query gets 
converted to Apex DAG.   User specified query is processed by Calcite Query 
planner; this involves parsing and optimizing the query to generate Relation 
Expression Tree.   This Relation Expression Tree is received by Apache 
Apex\u2019s SQL module to finally convert to an Apex DAG having series of 
operators.
    One peculiarity of Calcite queries is that the data source and destination 
need not be RDBMS systems; in the above example,  File  refers to a file in the 
filesystem and  Kafka  to a Kafka message broker. Calcite allows Apex to 
register table sources and destinations as anything which can return a row type 
results. So a \u201cscan\u201d relational expression gets converted to 
\u201cKafkaInputOperator + ParseOperator\u201d, a result of which is series of 
POJOs reflecting a Row Type. Similarly, the \u201cinsert\u201d Relational 
Expression translated to \u201cFormatOperator + FileOutputOperator\u201d.  For 
more details about the integration, click  here .", 
+            "title": "Apex-Calcite Integration"
+        }, 
+        {
+            "location": "/apis/calcite/#sql-apis-for-apache-apex", 
+            "text": "Listed below are the Java APIs which can be used by 
SQL/Apex users to create a DAG in the implementation of the  populateDAG  
method of the  StreamingApplication  interface.     API  Description      
SQLExecEnvironment.getEnvironment()  Creates a new SQL execution environment    
SQLExecEnvironment.registerTable(tableName, endpointInstance)  Registers a new 
abstract table with existing environment.  endpointInstance  is an object of 
type  Endpoint  which defines a table.    
SQLExecEnvironment.registerFunction(sqlFunctionName, holderClass, 
staticFunctionName)  Registers a new User Defined Scalar function    
SQLExecEnvironment.executeSQL(dag, sqlStatement)  Creates a DAG for a 
particular SQL statement     Usage of above APIs is described in detail in 
following sections.", 
+            "title": "SQL APIs for Apache Apex"
+        }, 
+        {
+            "location": "/apis/calcite/#example-1-pure-style-sql-application", 
+            "text": "With Apache Calcite Integration, you can use SQL queries 
across different data sources and provide UDFs (User Defined Functions) as per 
your business logic. This example will use a Kafka topic as the source and a 
HDFS file as the destination.\nFollowing application code will be used to 
explain APIs. Actual source code can be found  here .    public class 
PureStyleSQLApplication implements StreamingApplication\n  {\n    @Override\n   
 public void populateDAG(DAG dag, Configuration conf)\n    {\n       // Create 
new SQLExecEnvironment\n       SQLExecEnvironment sqlEnv = 
SQLExecEnvironment.getEnvironment();\n\n      // This is a string that defines 
a schema and is discussed in more detail in  Registering tables with 
SQLExecEnvironment  section \n      String inputSchemaString =  ... ;\n\n      
// similar to inputSchemaString, we also need to define outputSchemaString\n    
  String outputSchemaString =  ... ;\n\n       // Register KafkaEnpoint as  
ORDERS  table with
  kafka topic and data format as CSV\n       sqlEnv = sqlEnv.registerTable( \n  
                                   ORDERS , \n                                  
  new KafkaEndpoint( localhost:9090 , \n                                        
               inputTopic , \n                                                  
    new CSVMessageFormat(inputSchemaString))\n                                  
);\n\n       // Register FileEndpoint as  SALES  table with file path and data 
format as CSV\n       sqlEnv = sqlEnv.registerTable( \n                         
            SALES , \n                                    new FileEndpoint( 
/tmp/output , \n                                                      out.file 
, \n                                                     new 
CSVMessageFormat(outputSchemaString))\n                                  );\n\n 
      // Register scalar SQL UDF \n       sqlEnv = sqlEnv.registerFunction( 
APEXCONCAT , PureStyleSQLApplication.class,  apex_concat_str );\n\n
        // Converting SQL statement to DAG \n       String sql =  INSERT INTO 
SALES \n                       SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), 
APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n                       FROM 
ORDERS \n                       WHERE ID   3 AND PRODUCT LIKE 'paint%' ;\n      
 sqlEnv.executeSQL(dag, sql);\n    }// populateDAG finished\n\n    public 
static String apex_concat_str(String s1, String s2)\n    {\n        return s1 + 
s2;\n    } \n  }", 
+            "title": "Example 1: Pure Style SQL Application"
+        }, 
+        {
+            "location": "/apis/calcite/#constructing-sqlexecenvironment", 
+            "text": "The class  SQLExecEnvironment  provides a starting point 
and a simple way to define metadata needed for running a SQL statement; a new 
instance of this class is returned by the  getEnvironment  static method.      
// Creates SQLExecEnvironment instance by using static method getEnvironment\n  
SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();", 
+            "title": "Constructing SQLExecEnvironment"
+        }, 
+        {
+            "location": 
"/apis/calcite/#registering-tables-with-sqlexecenvironment", 
+            "text": "Next, we need to register tables which can be used in a 
query. For this purpose, we can use  registerTable  method from 
SQLExecEnvironment.    // Register KafkaEnpoint as  ORDERS  table with kafka 
topic and data format as CSV\n  sqlEnv = sqlEnv.registerTable( \n               
                ORDERS , \n                              new KafkaEndpoint( 
localhost:9090 , \n                                                 inputTopic 
, \n                                                new 
CSVMessageFormat(inputSchemaString))\n                            );\n\n  // 
Register FileEndpoint as  SALES  table with file path and data format as CSV\n  
sqlEnv = sqlEnv.registerTable( \n                               SALES , \n      
                        new FileEndpoint( /tmp/output , \n                      
                          out.file , \n                                         
      new CSVMessageFormat(inputSchemaString))\n                            );  
\"registerT
 able\"  method takes the name of the table and an instance of endpoint as 
parameters. Endpoint signifies data storage mechanism and type of 
source/destination for the data. These endpoints require different types of 
configurations and possibly data formats. The data format is defined using an 
implementation of the  MessageFormat  interface; the  CSVMessageFormat  
implementation can be configured with a schema string as follows:  {\n   
separator :  , ,\n   quoteChar :  \\ ,\n   fields : [\n    {\n       name :  
RowTime ,\n       type :  Date ,\n       constraints : {\n         format :  
dd/MM/yyyy hh:mm:ss Z \n      }\n    },\n    {\n       name :  id ,\n       
type :  Integer \n    },\n    {\n       name :  Product ,\n       type :  
String \n    },\n    {\n       name :  units ,\n       type :  Integer \n    
}\n  ]\n}  The schema string is a JSON string defining a separator character, 
quote character for fields with String type and a list of fields where, for 
each field, its name, t
 ype and any additional constraints are specified.  Following data endpoints 
are supported:    KafkaEnpoint \n: To define a Kafka Endpoint we need to 
specify the Kafka broker (as host:port), topic name and MessageFormat as seen 
in line 1 in the code above.  FileEndpoint \n: It needs to be configured with 
the filesystem path, file name and MessageFormat as in line 2 in the code 
above.   StreamEndpoint  \n: This allows us to connect existing operator output 
or input ports to the SQL query as a data source or sink respectively. 
StreamEndpoint needs immediate downstream operator's input port or immediate 
upstream operator's output port and the field mapping for CSV data or POJO 
class. This will be explained in detail in next  example .", 
+            "title": "Registering tables with SQLExecEnvironment"
+        }, 
+        {
+            "location": 
"/apis/calcite/#using-user-defined-functions-udf-in-a-sql-query", 
+            "text": "We can use our own scalar UDF, implemented in Java, in a 
SQL statement for data manipulation but first, we need to register the function 
with the execution environment by using the  registerFunction  method.    
sqlEnv = sqlEnv.registerFunction( APEXCONCAT , PureStyleSQLApplication.class,  
apex_concat_str );  In above code,  registerFunction  takes the UDF name to be 
used in SQL, JAVA class which implements the static method and name of that 
method as parameters. \nThe static method  apex_concat_str  takes two String 
objects as input parameters from the SQL query.    public static String 
apex_concat_str(String s1, String s2)\n  {\n    return s1 + s2;\n  }  The 
scalar UDF \"APEXCONCAT\" that was registered above can be used in SQL as 
described below. FLOOR and SUBSTRING are standard SQL scalar functions 
supported by Apache Calcite.  INSERT INTO SALES \n       SELECT STREAM ROWTIME, 
FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n      
 FR
 OM ORDERS \n       WHERE ID   3 AND PRODUCT LIKE 'paint%'  To read about all 
functions and operators supported by Apache Calcite, click  here .", 
+            "title": "Using User Defined Functions (UDF) in a SQL query"
+        }, 
+        {
+            "location": "/apis/calcite/#executing-sql-query", 
+            "text": "Finally to execute the query we need to use  executeSQL  
function with a DAG and SQL statement as parameters.    // Converting SQL 
statement to DAG \n  String sql =  INSERT INTO SALES \n                SELECT 
STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', 
SUBSTRING(PRODUCT, 6, 7)) \n                FROM ORDERS \n                WHERE 
ID   3 AND PRODUCT LIKE 'paint%' ;\n  sqlEnv.executeSQL(dag, sql);  When 
executeSQL method is called, the query goes through various phases like 
conversion to relational algebra, optimization and planning in Calcite to 
generate Relation Expression Tree. \nThe generated Relation Expression Tree is 
consumed by Apex SQL and converted to a DAG using operators available in Apache 
Malhar. In the above example, the ORDERS and SALES tables will be converted to 
the operators KafkaInputOperator and FileOutputFormatter respectively, paired 
with the CSVParser formatter in both cases.  A  WHERE  clause is used in this 
query; it de
 fines the desired filter for rows and is converted to a  
FilterTransformOperator  in the DAG. Similarly, the projection defining desired 
columns is converted into another instance of the  FilterTransformOperator . 
The DAG created for this application will look like this:", 
+            "title": "Executing SQL Query"
+        }, 
+        {
+            "location": 
"/apis/calcite/#example-2-fusion-style-sql-application", 
+            "text": "As described in Pure Style SQL application, we can use 
different data sources as source and sink while developing Apex Applications 
with Calcite. This example will describe how we can develop Apex application 
with Apex stream as abstract table for SQL query. Actual source code can be 
found  here .    // Define Kafka Input Operator for reading data from Kafka\n  
KafkaSinglePortInputOperator kafkaInput = dag.addOperator( KafkaInput , \n      
                                                     
KafkaSinglePortInputOperator.class);\n\n  kafkaInput.setInitialOffset( EARLIEST 
);\n\n  // Add CSVParser\n  CsvParser csvParser = dag.addOperator( CSVParser , 
CsvParser.class);\n  dag.addStream( KafkaToCSV , kafkaInput.outputPort, 
csvParser.in);  Once we define DAG with KafkaInputOperator and CSVParser, it 
can parse data from Kafka topic. Upto this point, this is a regular Apex 
application without SQL. After this, we can register the output of CSVParser as 
a table using  Str
 eamEndpoint  to run a SQL statement. This way we can develop applications in 
fusion style where the DAG is part SQL and part regular Apex DAG.  The 
following code will describe how we can define StreamEndpoint.     
SQLExecEnvironment sqlEnv = sqlEnv.getEnvironment();\n  Map String, Class  
fieldMapping = ImmutableMap. String, Class of( RowTime , Date.class,\n          
                                                        id , Integer.class,\n   
                                                               Product , 
String.class,\n                                                                 
 units , Integer.class);\n  sqlEnv = sqlEnv.registerTable( FROMCSV , new 
StreamEndpoint(csvParser.out, fieldMapping));  To read existing data stream, we 
need to register it as a table with SQL execution environment with the name of 
the table and StreamEndpoint. StreamEndpoint can serve as input table or output 
table in SQL. For input table configuration we need to initialise 
StreamEndpoint w
 ith immediate upstream operator's output port and fieldMapping or POJO class 
for input tuple(as shown above). For output table configuration, we need to 
initialise StreamEndpoint with immediate downstream operator's input port and 
fieldMapping or POJO class for output tuple. Once we register StreamEndpoint as 
a table with a name in SQL Execution Environment, it can be used as a table in 
SQL statement similar to other endpoints.  When executeSQL method is called, 
the specified SQL is converted to DAG as described in the previous section. 
Both examples read CSV data from Kafka. But in the pure style SQL example the  
KafkaInputOperator  and  CSVParser  in the DAG are created implicitly by the 
use of the KafkaEndpoint usage while in the fusion style example, they are 
explicitly defined as part of the DAG which is then extended with other 
operators as shown in the image below.    For all Apex-Calcite integration 
examples, click  here .", 
+            "title": "Example 2: Fusion Style SQL Application"
+        }, 
+        {
+            "location": "/apis/calcite/#ongoing-efforts", 
+            "text": "Apache Apex-Calcite integration provides support for 
basic queries and efforts are underway to extend support for aggregations, 
sorting and other features using Tumbling, Hopping and Session 
Windows.\nSupport for JSON, XML and JDBC endpoint are also planned. The goal of 
this integration is to make developing a streaming application using SQL easy 
so that SQL Developers don't have to write any java code at all.", 
+            "title": "Ongoing efforts"
+        }, 
+        {
+            "location": "/operators/block_reader/", 
+            "text": "Block Reader\n\n\nThis is a scalable operator that reads 
and parses blocks of data sources into records. A data source can be a file or 
a message bus that contains records and a block defines a chunk of data in the 
source by specifying the block offset and the length of the source belonging to 
the block. \n\n\nWhy is it needed?\n\n\nA Block Reader is needed to parallelize 
reading and parsing of a single data source, for example a file. Simple 
parallelism of reading data sources can be achieved by multiple partitions 
reading different source of same type (for files see 
\nAbstractFileInputOperator\n) but Block Reader partitions can read blocks of 
same source in parallel and parse them for records ensuring that no record is 
duplicated or missed.\n\n\nClass Diagram\n\n\n\n\nAbstractBlockReader\n\n\nThis 
is the abstract implementation that serves as the base for different types of 
data sources. It defines how a block metadata is processed. The flow diagram 
below desc
 ribes the processing of a block 
metadata.\n\n\n\n\nPorts\n\n\n\n\n\n\nblocksMetadataInput: input port on which 
block metadata are received.\n\n\n\n\n\n\nblocksMetadataOutput: output port on 
which block metadata are emitted if the port is connected. This port is useful 
when a downstream operator that receives records from block reader may also be 
interested to know the details of the corresponding 
blocks.\n\n\n\n\n\n\nmessages: output port on which tuples of type 
\ncom.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord\n are emitted. 
This class encapsulates a \nrecord\n and the \nblockId\n of the corresponding 
block.\n\n\n\n\n\n\nreaderContext\n\n\nThis is one of the most important fields 
in the block reader. It is of type 
\ncom.datatorrent.lib.io.block.ReaderContext\n and is responsible for fetching 
bytes that make a record. It also lets the reader know how many total bytes 
were consumed which may not be equal to the total bytes in a record because 
consumed bytes also include
  bytes for the record delimiter which may not be a part of the actual 
record.\n\n\nOnce the reader creates an input stream for the block (or uses the 
previous opened stream if the current block is successor of the previous block) 
it initializes the reader context by invoking 
\nreaderContext.initialize(stream, blockMetadata, consecutiveBlock);\n. 
Initialize method is where any implementation of \nReaderContext\n can perform 
all the operations which have to be executed just before reading the block or 
create states which are used during the lifetime of reading the 
block.\n\n\nOnce the initialization is done, \nreaderContext.next()\n is called 
repeatedly until it returns \nnull\n. It is left to the \nReaderContext\n 
implementations to decide when a block is completely processed. In cases when a 
record is split across adjacent blocks, reader context may decide to read ahead 
of the current block boundary to completely fetch the split record (examples- 
\nLineReaderContext\n and \nReadAhea
 dLineReaderContext\n). In other cases when there isn't a possibility of split 
record (example- \nFixedBytesReaderContext\n), it returns \nnull\n immediately 
when the block boundary is reached. The return type of \nreaderContext.next()\n 
is of type \ncom.datatorrent.lib.io.block.ReaderContext.Entity\n which is just 
a wrapper for a \nbyte[]\n that represents the record and total bytes used in 
fetching the record.\n\n\nAbstract methods\n\n\n\n\n\n\nSTREAM setupStream(B 
block)\n: creating a stream for a block is dependent on the type of source 
which is not known to AbstractBlockReader. Sub-classes which deal with a 
specific data source provide this implementation.\n\n\n\n\n\n\nR 
convertToRecord(byte[] bytes)\n: this converts the array of bytes into the 
actual instance of record type.\n\n\n\n\n\n\nAuto-scalability\n\n\nBlock reader 
can auto-scale, that is, depending on the backlog (total number of all the 
blocks which are waiting in the \nblocksMetadataInput\n port queue of all 
partition
 s) it can create more partitions or reduce them. Details are discussed in the 
last section which covers the \npartitioner and 
stats-listener\n.\n\n\nConfiguration\n\n\n\n\nmaxReaders\n: when auto-scaling 
is enabled, this controls the maximum number of block reader partitions that 
can be created.\n\n\nminReaders\n: when auto-scaling is enabled, this controls 
the minimum number of block reader partitions that should always 
exist.\n\n\ncollectStats\n: this enables or disables auto-scaling. When it is 
set to \ntrue\n the stats (number of blocks in the queue) are collected and 
this triggers partitioning; otherwise auto-scaling is 
disabled.\n\n\nintervalMillis\n: when auto-scaling is enabled, this specifies 
the interval at which the reader will trigger the logic of computing the 
backlog and auto-scale.\n\n\n\n\n AbstractFSBlockReader\n\n\nThis abstract 
implementation deals with files. Different types of file systems that are 
implementations of \norg.apache.hadoop.fs.FileSystem\n are suppo
 rted. The user can override \ngetFSInstance()\n method to create an instance 
of a specific \nFileSystem\n. By default, filesystem instance is created from 
the filesytem URI that comes from the default hadoop 
configuration.\n\n\nprotected FileSystem getFSInstance() throws 
IOException\n{\n  return FileSystem.newInstance(configuration);\n}\n\n\n\n\nIt 
uses this filesystem instance to setup a stream of type 
\norg.apache.hadoop.fs.FSDataInputStream\n to read the 
block.\n\n\n@Override\nprotected FSDataInputStream 
setupStream(BlockMetadata.FileBlockMetadata block) throws IOException\n{\n  
return fs.open(new Path(block.getFilePath()));\n}\n\n\n\n\nAll the ports and 
configurations are derived from the super class. It doesn't provide an 
implementation of \nconvertToRecord(byte[] bytes)\n method which is delegated 
to concrete sub-classes.\n\n\nExample Application\n\n\nThis simple dag 
demonstrates how any concrete implementation of \nAbstractFSBlockReader\n can 
be plugged into an application. \
 n\n\n\n\nIn the above application, file splitter creates block metadata for 
files which are sent to block reader. Partitions of the block reader parses the 
file blocks for records which are filtered, transformed and then persisted to a 
file (created per block). Therefore block reader is parallel partitioned with 
the 2 downstream operators - filter/converter and record output operator. The 
code which implements this dag is below.\n\n\npublic class ExampleApplication 
implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG 
dag, Configuration configuration)\n  {\n    FileSplitterInput input = 
dag.addOperator(\nFile-splitter\n, new FileSplitterInput());\n    //any 
concrete implementation of AbstractFSBlockReader based on the use-case can be 
added here.\n    LineReader blockReader = dag.addOperator(\nBlock-reader\n, new 
LineReader());\n    Filter filter = dag.addOperator(\nFilter\n, new 
Filter());\n    RecordOutputOperator recordOutputOperator = dag.addOperator(\nRe
 cord-writer\n, new RecordOutputOperator());\n\n    dag.addStream(\nfile-block 
metadata\n, input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    
dag.addStream(\nrecords\n, blockReader.messages, filter.input);\n    
dag.addStream(\nfiltered-records\n, filter.output, 
recordOutputOperator.input);\n  }\n\n  /**\n   * Concrete implementation of 
{@link AbstractFSBlockReader} for which a record is a line in the file.\n   
*/\n  public static class LineReader extends 
AbstractFSBlockReader.AbstractFSReadAheadLineReader\nString\n\n  {\n\n    
@Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      
return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line 
starting with a '.' as invalid. Emits the valid records.\n   */\n  public 
static class Filter extends BaseOperator\n  {\n    public final transient 
DefaultOutputPort\nAbstractBlockReader.ReaderRecord\nString\n output = new 
DefaultOutputPort\n();\n    public final transient DefaultInputPort\nAbstractB
 lockReader.ReaderRecord\nString\n input = new 
DefaultInputPort\nAbstractBlockReader.ReaderRecord\nString\n()\n    {\n      
@Override\n      public void process(AbstractBlockReader.ReaderRecord\nString\n 
stringRecord)\n      {\n        //filter records and transform\n        //if 
the string starts with a '.' ignore the string.\n        if 
(!StringUtils.startsWith(stringRecord.getRecord(), \n.\n)) {\n          
output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * 
Persists the valid records to corresponding block files.\n   */\n  public 
static class RecordOutputOperator extends 
AbstractFileOutputOperator\nAbstractBlockReader.ReaderRecord\nString\n\n  {\n   
 @Override\n    protected String 
getFileName(AbstractBlockReader.ReaderRecord\nString\n tuple)\n    {\n      
return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    
protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord\nString\n 
tuple)\n    {\n      return tuple.getRecord().getBytes
 ();\n    }\n  }\n}\n\n\n\n\nConfiguration to parallel partition block reader 
with its downstream operators.\n\n\n  \nproperty\n\n    
\nname\ndt.operator.Filter.port.input.attr.PARTITION_PARALLEL\n/name\n\n    
\nvalue\ntrue\n/value\n\n  \n/property\n\n  \nproperty\n\n    
\nname\ndt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL\n/name\n\n 
   \nvalue\ntrue\n/value\n\n  
\n/property\n\n\n\n\n\nAbstractFSReadAheadLineReader\n\n\nThis extension of 
\nAbstractFSBlockReader\n parses lines from a block and binds the 
\nreaderContext\n field to an instance of 
\nReaderContext.ReadAheadLineReaderContext\n.\n\n\nIt is abstract because it 
doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n since 
the user may want to convert the bytes that make a line into some other type. 
\n\n\nReadAheadLineReaderContext\n\n\nIn order to handle a line split across 
adjacent blocks, ReadAheadLineReaderContext always reads beyond the block 
boundary and ignores the bytes till the first end
 -of-line character of all the blocks except the first block of the file. This 
ensures that no line is missed or incomplete.\n\n\nThis is one of the most 
common ways of handling a split record. It doesn't require any further 
information to decide if a line is complete. However, the cost of this 
consistent way to handle a line split is that it always reads from the next 
block.\n\n\nAbstractFSLineReader\n\n\nSimilar to 
\nAbstractFSReadAheadLineReader\n, even this parses lines from a block. 
However, it binds the \nreaderContext\n field to an instance of 
\nReaderContext.LineReaderContext\n.\n\n\nLineReaderContext\n\n\nThis handles 
the line split differently from \nReadAheadLineReaderContext\n. It doesn't 
always read from the next block. If the end of the last line is aligned with 
the block boundary then it stops processing the block. It does read from the 
next block when the boundaries are not aligned, that is, last line extends 
beyond the block boundary. The result of this is an inconsi
 stency in reading the next block.\n\n\nWhen the boundary of the last line of 
the previous block was aligned with its block, then the first line of the 
current block is a valid line. However, in the other case the bytes from the 
block start offset to the first end-of-line character should be ignored. 
Therefore, this means that any record formed by this reader context has to be 
validated. For example, if the lines are of fixed size then size of each record 
can be validated or if each line begins with a special field then that 
knowledge can be used to check if a record is complete.\n\n\nIf the validations 
of completeness fails for a line then \nconvertToRecord(byte[] bytes)\n should 
return null.\n\n\nFSSliceReader\n\n\nA concrete extension of 
\nAbstractFSBlockReader\n that reads fixed-size \nbyte[]\n from a block and 
emits the byte array wrapped in 
\ncom.datatorrent.netlet.util.Slice\n.\n\n\nThis operator binds the 
\nreaderContext\n to an instance of \nReaderContext.FixedBytesReaderCon
 text\n.\n\n\nFixedBytesReaderContext\n\n\nThis implementation of 
\nReaderContext\n never reads beyond a block boundary which can result in the 
last \nbyte[]\n of a block to be of a shorter length than the rest of the 
records.\n\n\nConfiguration\n\n\nreaderContext.length\n: length of each record. 
By default, this is initialized to the default hdfs block 
size.\n\n\nPartitioner and StatsListener\n\n\nThe logical instance of the block 
reader acts as the Partitioner (unless a custom partitioner is set using the 
operator attribute - \nPARTITIONER\n) as well as a StatsListener. This is 
because the \n\nAbstractBlockReader\n implements both the 
\ncom.datatorrent.api.Partitioner\n and \ncom.datatorrent.api.StatsListener\n 
interfaces and provides an implementation of \ndefinePartitions(...)\n and 
\nprocessStats(...)\n which make it auto-scalable.\n\n\nprocessStats \n\n\nThe 
application master invokes \nResponse processStats(BatchedOperatorStats 
stats)\n method on the logical instance with the 
 stats (\ntuplesProcessedPSMA\n, \ntuplesEmittedPSMA\n, \nlatencyMA\n, etc.) of 
each partition. The data which this operator is interested in is the 
\nqueueSize\n of the input port \nblocksMetadataInput\n.\n\n\nUsually the 
\nqueueSize\n of an input port gives the count of waiting control tuples plus 
data tuples. However, if a stats listener is interested only in the count of 
data tuples then that can be expressed by annotating the class with 
\n@DataQueueSize\n. In this case \nAbstractBlockReader\n itself is the 
\nStatsListener\n which is why it is annotated with 
\n@DataQueueSize\n.\n\n\nThe logical instance caches the queue size per 
partition and at regular intervals (configured by \nintervalMillis\n) sums 
these values to find the total backlog which is then used to decide whether 
re-partitioning is needed. The flow-diagram below describes this 
logic.\n\n\n\n\nThe goal of this logic is to create as many partitions within 
bounds (see \nmaxReaders\n and \nminReaders\n above) to quickly
  reduce this backlog or if the backlog is small then remove any idle 
partitions.\n\n\ndefinePartitions\n\n\nBased on the \nrepartitionRequired\n 
field of the \nResponse\n object which is returned by \nprocessStats\n method, 
the application master invokes 
\n\n\nCollection\nPartition\nAbstractBlockReader\n...\n 
definePartitions(Collection\nPartition\nAbstractBlockReader\n...\n partitions, 
PartitioningContext context)\n\n\n\n\non the logical instance which is also the 
partitioner instance. The implementation calculates the difference between 
required partitions and the existing count of partitions. If this difference is 
negative, then equivalent number of partitions are removed otherwise new 
partitions are created. \n\n\nPlease note auto-scaling can be disabled by 
setting \ncollectStats\n to \nfalse\n. If the use-case requires only static 
partitioning, then that can be achieved by setting \nStatelessPartitioner\n as 
the operator attribute- \nPARTITIONER\n on the block reader.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#block-reader", 
+            "text": "This is a scalable operator that reads and parses blocks 
of data sources into records. A data source can be a file or a message bus that 
contains records and a block defines a chunk of data in the source by 
specifying the block offset and the length of the source belonging to the 
block.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#why-is-it-needed", 
+            "text": "A Block Reader is needed to parallelize reading and 
parsing of a single data source, for example a file. Simple parallelism of 
reading data sources can be achieved by multiple partitions reading different 
source of same type (for files see  AbstractFileInputOperator ) but Block 
Reader partitions can read blocks of same source in parallel and parse them for 
records ensuring that no record is duplicated or missed.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/block_reader/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractblockreader", 
+            "text": "This is the abstract implementation that serves as the 
base for different types of data sources. It defines how a block metadata is 
processed. The flow diagram below describes the processing of a block 
metadata.", 
+            "title": "AbstractBlockReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#ports", 
+            "text": "blocksMetadataInput: input port on which block metadata 
are received.    blocksMetadataOutput: output port on which block metadata are 
emitted if the port is connected. This port is useful when a downstream 
operator that receives records from block reader may also be interested to know 
the details of the corresponding blocks.    messages: output port on which 
tuples of type  com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord  
are emitted. This class encapsulates a  record  and the  blockId  of the 
corresponding block.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/block_reader/#readercontext", 
+            "text": "This is one of the most important fields in the block 
reader. It is of type  com.datatorrent.lib.io.block.ReaderContext  and is 
responsible for fetching bytes that make a record. It also lets the reader know 
how many total bytes were consumed which may not be equal to the total bytes in 
a record because consumed bytes also include bytes for the record delimiter 
which may not be a part of the actual record.  Once the reader creates an input 
stream for the block (or uses the previous opened stream if the current block 
is successor of the previous block) it initializes the reader context by 
invoking  readerContext.initialize(stream, blockMetadata, consecutiveBlock); . 
Initialize method is where any implementation of  ReaderContext  can perform 
all the operations which have to be executed just before reading the block or 
create states which are used during the lifetime of reading the block.  Once 
the initialization is done,  readerContext.next()  is called repeatedl
 y until it returns  null . It is left to the  ReaderContext  implementations 
to decide when a block is completely processed. In cases when a record is split 
across adjacent blocks, reader context may decide to read ahead of the current 
block boundary to completely fetch the split record (examples-  
LineReaderContext  and  ReadAheadLineReaderContext ). In other cases when there 
isn't a possibility of split record (example-  FixedBytesReaderContext ), it 
returns  null  immediately when the block boundary is reached. The return type 
of  readerContext.next()  is of type  
com.datatorrent.lib.io.block.ReaderContext.Entity  which is just a wrapper for 
a  byte[]  that represents the record and total bytes used in fetching the 
record.", 
+            "title": "readerContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstract-methods", 
+            "text": "STREAM setupStream(B block) : creating a stream for a 
block is dependent on the type of source which is not known to 
AbstractBlockReader. Sub-classes which deal with a specific data source provide 
this implementation.    R convertToRecord(byte[] bytes) : this converts the 
array of bytes into the actual instance of record type.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/block_reader/#auto-scalability", 
+            "text": "Block reader can auto-scale, that is, depending on the 
backlog (total number of all the blocks which are waiting in the  
blocksMetadataInput  port queue of all partitions) it can create more 
partitions or reduce them. Details are discussed in the last section which 
covers the  partitioner and stats-listener .", 
+            "title": "Auto-scalability"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration", 
+            "text": "maxReaders : when auto-scaling is enabled, this controls 
the maximum number of block reader partitions that can be created.  minReaders 
: when auto-scaling is enabled, this controls the minimum number of block 
reader partitions that should always exist.  collectStats : this enables or 
disables auto-scaling. When it is set to  true  the stats (number of blocks in 
the queue) are collected and this triggers partitioning; otherwise auto-scaling 
is disabled.  intervalMillis : when auto-scaling is enabled, this specifies the 
interval at which the reader will trigger the logic of computing the backlog 
and auto-scale.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/block_reader/#example-application", 
+            "text": "This simple dag demonstrates how any concrete 
implementation of  AbstractFSBlockReader  can be plugged into an application.   
 In the above application, file splitter creates block metadata for files which 
are sent to block reader. Partitions of the block reader parses the file blocks 
for records which are filtered, transformed and then persisted to a file 
(created per block). Therefore block reader is parallel partitioned with the 2 
downstream operators - filter/converter and record output operator. The code 
which implements this dag is below.  public class ExampleApplication implements 
StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, 
Configuration configuration)\n  {\n    FileSplitterInput input = 
dag.addOperator( File-splitter , new FileSplitterInput());\n    //any concrete 
implementation of AbstractFSBlockReader based on the use-case can be added 
here.\n    LineReader blockReader = dag.addOperator( Block-reader , new 
LineReader());\n 
    Filter filter = dag.addOperator( Filter , new Filter());\n    
RecordOutputOperator recordOutputOperator = dag.addOperator( Record-writer , 
new RecordOutputOperator());\n\n    dag.addStream( file-block metadata , 
input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    
dag.addStream( records , blockReader.messages, filter.input);\n    
dag.addStream( filtered-records , filter.output, recordOutputOperator.input);\n 
 }\n\n  /**\n   * Concrete implementation of {@link AbstractFSBlockReader} for 
which a record is a line in the file.\n   */\n  public static class LineReader 
extends AbstractFSBlockReader.AbstractFSReadAheadLineReader String \n  {\n\n    
@Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      
return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line 
starting with a '.' as invalid. Emits the valid records.\n   */\n  public 
static class Filter extends BaseOperator\n  {\n    public final transient 
DefaultOutputPort AbstractBlockRea
 der.ReaderRecord String  output = new DefaultOutputPort ();\n    public final 
transient DefaultInputPort AbstractBlockReader.ReaderRecord String  input = new 
DefaultInputPort AbstractBlockReader.ReaderRecord String ()\n    {\n      
@Override\n      public void process(AbstractBlockReader.ReaderRecord String  
stringRecord)\n      {\n        //filter records and transform\n        //if 
the string starts with a '.' ignore the string.\n        if 
(!StringUtils.startsWith(stringRecord.getRecord(),  . )) {\n          
output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * 
Persists the valid records to corresponding block files.\n   */\n  public 
static class RecordOutputOperator extends AbstractFileOutputOperator 
AbstractBlockReader.ReaderRecord String \n  {\n    @Override\n    protected 
String getFileName(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n     
 return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    
protected byte[] getBytesForTup
 le(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n      return 
tuple.getRecord().getBytes();\n    }\n  }\n}  Configuration to parallel 
partition block reader with its downstream operators.     property \n     name 
dt.operator.Filter.port.input.attr.PARTITION_PARALLEL /name \n     value true 
/value \n   /property \n   property \n     name 
dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL /name \n     value 
true /value \n   /property", 
+            "title": "Example Application"
+        }, 
+        {
+            "location": 
"/operators/block_reader/#abstractfsreadaheadlinereader", 
+            "text": "This extension of  AbstractFSBlockReader  parses lines 
from a block and binds the  readerContext  field to an instance of  
ReaderContext.ReadAheadLineReaderContext .  It is abstract because it doesn't 
provide an implementation of  convertToRecord(byte[] bytes)  since the user may 
want to convert the bytes that make a line into some other type.", 
+            "title": "AbstractFSReadAheadLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#readaheadlinereadercontext", 
+            "text": "In order to handle a line split across adjacent blocks, 
ReadAheadLineReaderContext always reads beyond the block boundary and ignores 
the bytes till the first end-of-line character of all the blocks except the 
first block of the file. This ensures that no line is missed or incomplete.  
This is one of the most common ways of handling a split record. It doesn't 
require any further information to decide if a line is complete. However, the 
cost of this consistent way to handle a line split is that it always reads from 
the next block.", 
+            "title": "ReadAheadLineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractfslinereader", 
+            "text": "Similar to  AbstractFSReadAheadLineReader , even this 
parses lines from a block. However, it binds the  readerContext  field to an 
instance of  ReaderContext.LineReaderContext .", 
+            "title": "AbstractFSLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#linereadercontext", 
+            "text": "This handles the line split differently from  
ReadAheadLineReaderContext . It doesn't always read from the next block. If the 
end of the last line is aligned with the block boundary then it stops 
processing the block. It does read from the next block when the boundaries are 
not aligned, that is, last line extends beyond the block boundary. The result 
of this is an inconsistency in reading the next block.  When the boundary of 
the last line of the previous block was aligned with its block, then the first 
line of the current block is a valid line. However, in the other case the bytes 
from the block start offset to the first end-of-line character should be 
ignored. Therefore, this means that any record formed by this reader context 
has to be validated. For example, if the lines are of fixed size then size of 
each record can be validated or if each line begins with a special field then 
that knowledge can be used to check if a record is complete.  If the 
validations 
 of completeness fails for a line then  convertToRecord(byte[] bytes)  should 
return null.", 
+            "title": "LineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#fsslicereader", 
+            "text": "A concrete extension of  AbstractFSBlockReader  that 
reads fixed-size  byte[]  from a block and emits the byte array wrapped in  
com.datatorrent.netlet.util.Slice .  This operator binds the  readerContext  to 
an instance of  ReaderContext.FixedBytesReaderContext .", 
+            "title": "FSSliceReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#fixedbytesreadercontext", 
+            "text": "This implementation of  ReaderContext  never reads beyond 
a block boundary which can result in the last  byte[]  of a block to be of a 
shorter length than the rest of the records.", 
+            "title": "FixedBytesReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration_1", 
+            "text": "readerContext.length : length of each record. By default, 
this is initialized to the default hdfs block size.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": 
"/operators/block_reader/#partitioner-and-statslistener", 
+            "text": "The logical instance of the block reader acts as the 
Partitioner (unless a custom partitioner is set using the operator attribute -  
PARTITIONER ) as well as a StatsListener. This is because the  
AbstractBlockReader  implements both the  com.datatorrent.api.Partitioner  and  
com.datatorrent.api.StatsListener  interfaces and provides an implementation of 
 definePartitions(...)  and  processStats(...)  which make it auto-scalable.", 
+            "title": "Partitioner and StatsListener"
+        }, 
+        {
+            "location": "/operators/block_reader/#processstats", 
+            "text": "The application master invokes  Response 
processStats(BatchedOperatorStats stats)  method on the logical instance with 
the stats ( tuplesProcessedPSMA ,  tuplesEmittedPSMA ,  latencyMA , etc.) of 
each partition. The data which this operator is interested in is the  queueSize 
 of the input port  blocksMetadataInput .  Usually the  queueSize  of an input 
port gives the count of waiting control tuples plus data tuples. However, if a 
stats listener is interested only in the count of data tuples then that can be 
expressed by annotating the class with  @DataQueueSize . In this case  
AbstractBlockReader  itself is the  StatsListener  which is why it is annotated 
with  @DataQueueSize .  The logical instance caches the queue size per 
partition and at regular intervals (configured by  intervalMillis ) sums these 
values to find the total backlog which is then used to decide whether 
re-partitioning is needed. The flow-diagram below describes this logic.   The 
goal of this l
 ogic is to create as many partitions within bounds (see  maxReaders  and  
minReaders  above) to quickly reduce this backlog or if the backlog is small 
then remove any idle partitions.", 
+            "title": "processStats "
+        }, 
+        {
+            "location": "/operators/block_reader/#definepartitions", 
+            "text": "Based on the  repartitionRequired  field of the  Response 
 object which is returned by  processStats  method, the application master 
invokes   Collection Partition AbstractBlockReader ...  
definePartitions(Collection Partition AbstractBlockReader ...  partitions, 
PartitioningContext context)  on the logical instance which is also the 
partitioner instance. The implementation calculates the difference between 
required partitions and the existing count of partitions. If this difference is 
negative, then equivalent number of partitions are removed otherwise new 
partitions are created.   Please note auto-scaling can be disabled by setting  
collectStats  to  false . If the use-case requires only static partitioning, 
then that can be achieved by setting  StatelessPartitioner  as the operator 
attribute-  PARTITIONER  on the block reader.", 
+            "title": "definePartitions"
+        }, 
+        {
+            "location": "/operators/csvformatter/", 
+            "text": "CsvFormatter\n\n\nOperator Objective\n\n\nThis operator 
receives a POJO (\nPlain Old Java Object\n) as an incoming tuple, converts the 
data in \nthe incoming POJO to a custom delimited string and emits the 
delimited string.\n\n\nCsvFormatter supports schema definition as a JSON 
string. \n\n\nCsvFormatter does not hold any state and is \nidempotent\n, 
\nfault-tolerant\n and \nstatically/dynamically partitionable\n.\n\n\nOperator 
Information\n\n\n\n\nOperator location: \nmalhar-contrib\n\n\nAvailable since: 
\n3.2.0\n\n\nOperator state: \nEvolving\n\n\nJava Packages:\n\n\nOperator: 
\ncom.datatorrent.contrib.formatter.CsvFormatter\n\n\n\n\n\n\n\n\nProperties, 
Attributes and Ports\n\n\nProperties of 
POJOEnricher\n\n\n\n\n\n\n\n\nProperty\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\nDefault
 Value\n\n\n\n\n\n\n\n\n\n\nschema\n\n\nContents of the schema.Schema is 
specified in a json 
format.\n\n\nString\n\n\nYes\n\n\nN/A\n\n\n\n\n\n\n\n\nPlatform Attributes that 
influe
 nces operator 
behavior\n\n\n\n\n\n\n\n\nAttribute\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\n\n\n\n\n\n\n\n\nin.TUPLE_CLASS\n\n\nTUPLE_CLASS
 attribute on input port which tells operator the class of POJO which will be 
incoming\n\n\nClass or 
FQCN\n\n\nYes\n\n\n\n\n\n\n\n\nPorts\n\n\n\n\n\n\n\n\nPort\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\n\n\n\n\n\n\n\n\nin\n\n\nTuples
 which need to be formatted are received on this port\n\n\nObject 
(POJO)\n\n\nYes\n\n\n\n\n\n\nout\n\n\nTuples that are formatted are emitted 
from this port\n\n\nString\n\n\nNo\n\n\n\n\n\n\nerr\n\n\nTuples that could not 
be converted are emitted on this 
port\n\n\nObject\n\n\nNo\n\n\n\n\n\n\n\n\nLimitations\n\n\nCurrent CsvFormatter 
contain following limitations:\n\n\n\n\nThe field names in schema and the pojo 
field names should match.For eg. if name of the schema field is 
\"customerName\", then POJO should contain a field with the same name. 
\n\n\nField wise validation/formatting is not yet supported.\n\n\nTh
 e fields will be written to the file in the same order as specified in 
schema.json\n\n\n\n\nExample\n\n\nExample for CsvFormatter can be found at: 
\nhttps://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter\n\n\nAdvanced\n\n\n
 Schema format for CsvFormatter\n\n\nCsvFormatter expects schema to be a String 
in JSON format:\n\n\nExample for format of schema:\n\n\n{\n  \nseparator\n: 
\n,\n,\n  \nquoteChar\n: \n\\\n,\n  \nlineDelimiter\n: \n\\n\n,\n  \nfields\n: 
[\n    {\n      \nname\n: \ncampaignId\n,\n      \ntype\n: \nInteger\n\n    
},\n    {\n      \nname\n: \nstartDate\n,\n      \ntype\n: \nDate\n,\n      
\nconstraints\n: {\n        \nformat\n: \nyyyy-MM-dd\n\n      }\n    }\n    
]\n}\n\n\n\n\nPartitioning of CsvFormatter\n\n\nBeing stateless operator, 
CsvFormatter will ensure built-in partitioners present in Malhar library can be 
directly used by setting properties as follows:\n\n\nStateless partioning of 
CsvFormatter\n\n\nStateless partitioning will ensure that Cs
 vFormatter will be partitioned right at the start of the application and will 
remain partitioned throughout the lifetime of the DAG.\nCsvFormatter can be 
stateless partitioned by adding following lines to properties.xml:\n\n\n  
\nproperty\n\n    
\nname\ndt.operator.{OperatorName}.attr.PARTITIONER\n/name\n\n    
\nvalue\ncom.datatorrent.common.partitioner.StatelessPartitioner:2\n/value\n\n  
\n/property\n\n\n\n\n\nwhere {OperatorName} is the name of the CsvFormatter 
operator.\nAbove lines will partition CsvFormatter statically 2 times. Above 
value can be changed accordingly to change the number of static 
partitions.\n\n\nDynamic Partitioning of CsvFormatter\n\n\nDynamic partitioning 
is a feature of Apex platform which changes the partition of the operator based 
on certain conditions.\nCsvFormatter can be dynamically partitioned using below 
out-of-the-box partitioner:\n\n\nThroughput based\n\n\nFollowing code can be 
added to populateDAG method of application to dynamically partition Csv
 Formatter:\n\n\n    StatelessThroughputBasedPartitioner\nCsvFormatter\n 
partitioner = new StatelessThroughputBasedPartitioner\n();\n    
partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));\n    
partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));\n    
partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));\n    
dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{partitioner}));\n    
dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, 
partitioner);\n\n\n\n\nAbove code will dynamically partition CsvFormatter when 
throughput changes.\nIf overall throughput of CsvFormatter goes beyond 30000 or 
less than 10000, the platform will repartition CsvFormatter \nto balance 
throughput of a single partition to be between 10000 and 30000.\nCooldownMillis 
of 10000 will be used as threshold time for which  throughput change is 
observed.", 
+            "title": "CSV Formatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#csvformatter", 
+            "text": "", 
+            "title": "CsvFormatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#operator-objective", 
+            "text": "This operator receives a POJO ( Plain Old Java Object ) 
as an incoming tuple, converts the data in \nthe incoming POJO to a custom 
delimited string and emits the delimited string.  CsvFormatter supports schema 
definition as a JSON string.   CsvFormatter does not hold any state and is  
idempotent ,  fault-tolerant  and  statically/dynamically partitionable .", 
+            "title": "Operator Objective"
+        }, 
+        {
+            "location": "/operators/csvformatter/#operator-information", 
+            "text": "Operator location:  malhar-contrib  Available since:  
3.2.0  Operator state:  Evolving  Java Packages:  Operator:  
com.datatorrent.contrib.formatter.CsvFormatter", 
+            "title": "Operator Information"
+        }, 
+        {
+            "location": 
"/operators/csvformatter/#properties-attributes-and-ports", 
+            "text": "", 
+            "title": "Properties, Attributes and Ports"
+        }, 
+        {
+            "location": 
"/operators/csvformatter/#platform-attributes-that-influences-operator-behavior",
 
+            "text": "Attribute  Description  Type  Mandatory      
in.TUPLE_CLASS  TUPLE_CLASS attribute on input port which tells operator the 
class of POJO which will be incoming  Class or FQCN  Yes", 
+            "title": "Platform Attributes that influences operator behavior"
+        }, 
+        {
+            "location": "/operators/csvformatter/#ports", 
+            "text": "Port  Description  Type  Mandatory      in  Tuples which 
need to be formatted are received on this port  Object (POJO)  Yes    out  
Tuples that are formatted are emitted from this port  String  No    err  Tuples 
that could not be converted are emitted on this port  Object  No", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/csvformatter/#limitations", 
+            "text": "Current CsvFormatter contain following limitations:   The 
field names in schema and the pojo field names should match.For eg. if name of 
the schema field is \"customerName\", then POJO should contain a field with the 
same name.   Field wise validation/formatting is not yet supported.  The fields 
will be written to the file in the same order as specified in schema.json", 
+            "title": "Limitations"
+        }, 
+        {
+            "location": "/operators/csvformatter/#example", 
+            "text": "Example for CsvFormatter can be found at:  
https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter";, 
+            "title": "Example"
+        }, 
+        {
+            "location": "/operators/csvformatter/#advanced", 
+            "text": "", 
+            "title": "Advanced"
+        }, 
+        {
+            "location": 
"/operators/csvformatter/#partitioning-of-csvformatter", 
+            "text": "Being stateless operator, CsvFormatter will ensure 
built-in partitioners present in Malhar library can be directly used by setting 
properties as follows:", 
+            "title": "Partitioning of CsvFormatter"
+        }, 
+        {
+            "location": 
"/operators/csvformatter/#stateless-partioning-of-csvformatter", 
+            "text": "Stateless partitioning will ensure that CsvFormatter will 
be partitioned right at the start of the application and will remain 
partitioned throughout the lifetime of the DAG.\nCsvFormatter can be stateless 
partitioned by adding following lines to properties.xml:     property \n     
name dt.operator.{OperatorName}.attr.PARTITIONER /name \n     value 
com.datatorrent.common.partitioner.StatelessPartitioner:2 /value \n   /property 
  where {OperatorName} is the name of the CsvFormatter operator.\nAbove lines 
will partition CsvFormatter statically 2 times. Above value can be changed 
accordingly to change the number of static partitions.", 
+            "title": "Stateless partioning of CsvFormatter"
+        }, 
+        {
+            "location": 
"/operators/csvformatter/#dynamic-partitioning-of-csvformatter", 
+            "text": "Dynamic partitioning is a feature of Apex platform which 
changes the partition of the operator based on certain 
conditions.\nCsvFormatter can be dynamically partitioned using below 
out-of-the-box partitioner:", 
+            "title": "Dynamic Partitioning of CsvFormatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#throughput-based", 
+            "text": "Following code can be added to populateDAG method of 
application to dynamically partition CsvFormatter:      
StatelessThroughputBasedPartitioner CsvFormatter  partitioner = new 
StatelessThroughputBasedPartitioner ();\n    
partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));\n    
partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));\n    
partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));\n    
dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{partitioner}));\n    
dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);  
Above code will dynamically partition CsvFormatter when throughput changes.\nIf 
overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the 
platform will repartition CsvFormatter \nto balance throughput of a single 
partition to be between 10000 and 30000.\nCooldownMillis of 10000 will be used 
as threshold time for which  t
 hroughput change is observed.", 
+            "title": "Throughput based"
+        }, 
+        {
+            "location": "/operators/csvParserOperator/", 
+            "text": "Csv Parser Operator\n\n\nOperator Objective\n\n\nThis 
operator is designed to parse delimited records and construct a map or concrete 
java class also known as \n\"POJO\"\n out of it. User need to provide the 
schema to describe the delimited data. Based on schema definition the operator 
will parse the incoming record to object map and POJO.  User can also provide 
constraints if any, in the schema. The supported constraints are listed in 
\nconstraints table\n. The incoming record will be validated against those 
constraints. Valid records will be emitted as POJO / map while invalid ones are 
emitted on error port with error message.\n\n\nNote\n: field names of POJO must 
match field names in schema and in the same order as it appears in the incoming 
data.\n\n\nOverview\n\n\nThe operator is \nidempotent\n, \nfault-tolerant\n and 
\npartitionable\n.\n\n\nClass Diagram\n\n\n\n\nOperator 
Information\n\n\n\n\nOperator location:\nmalhar-contrib\n\n\nAvailable 
since:\n3.2.0\
 n\n\nOperator state:\nEvolving\n\n\nJava 
Package:\ncom.datatorrent.contrib.parser.CsvParser\n\n\

<TRUNCATED>

Reply via email to