This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 658cafc31afe2cf2f3dfec5511844c96a2a7fa1a
Author: Thomas Weise <t...@apache.org>
AuthorDate: Sun Aug 6 08:59:37 2017 -0700

    Expand README and improve formatting.
---
 examples/exactly-once/README.md                    | 25 ++++++++++++++++++++--
 .../exactlyonce/ExactlyOnceJdbcOutputApp.java      | 13 ++++++++---
 .../exactlyonce/ExactlyOnceJdbcOutputTest.java     |  7 +++---
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
index 5254b4c..4c3f10d 100644
--- a/examples/exactly-once/README.md
+++ b/examples/exactly-once/README.md
@@ -1,8 +1,27 @@
 # Examples for end-to-end exactly-once
 
+The examples are a variation of word count to illustrate end-to-end 
exactly-once processing
+by incorporating the external system integration aspect, which needs to be 
taken into account when
+developing real-world pipelines:
+
+* Read from Kafka source
+* Windowed count aggregation that emits incremental aggregates
+* Sink that maintains totals accumulating the incremental aggregates (shown 
for JDBC and file output)
+
+The examples combine the 3 properties that are required for end-to-end 
exactly-once results:
+
+1. At-least-once processing that guarantees no loss of data
+2. Idempotency in the DAG (Kafka input operator and repeatable/deterministic 
streaming windows)
+3. Consistent state between DAG and external system, enabled by the output 
operators.
+
+The test cases show how the applications can be configured to run in embedded 
mode (including Kafka).
+
 ## Read from Kafka, write to JDBC
 
-This application shows exactly-once output to JDBC through transactions:
+Shows exactly-once output to JDBC through transactions. The JDBC output 
operator
+keeps track of the streaming window along with the count to avoid duplicate 
writes on replay
+during recovery. This is an example for continuously updating results in the 
database,
+enabled by the transactions.
 
 
[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
 
@@ -10,7 +29,9 @@ This application shows exactly-once output to JDBC through 
transactions:
 
 ## Read from Kafka, write to Files
 
-This application shows exactly-once output to HDFS through atomic file 
operation:
+This application shows exactly-once output to files through atomic file 
operation. In contrast to the
+JDBC example, output can only occur once the final count is computed. This 
implies batching at the sink,
+leading to high latency.
 
 
[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
 
diff --git 
a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
 
b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 33ae9dc..6982833 100644
--- 
a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
+++ 
b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -48,7 +48,8 @@ public class ExactlyOnceJdbcOutputApp implements 
StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    KafkaSinglePortStringInputOperator kafkaInput = 
dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+    KafkaSinglePortStringInputOperator kafkaInput = 
dag.addOperator("kafkaInput",
+        new KafkaSinglePortStringInputOperator());
     kafkaInput.setWindowDataManager(new FSWindowDataManager());
     UniqueCounterFlat count = dag.addOperator("count", new 
UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new 
CountStoreOperator());
@@ -58,7 +59,8 @@ public class ExactlyOnceJdbcOutputApp implements 
StreamingApplication
     dag.addStream("counts", count.counts, store.input, cons.input);
   }
 
-  public static class CountStoreOperator extends 
AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+  public static class CountStoreOperator
+      extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, 
Integer>>
   {
     public static final String SQL =
         "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
@@ -73,13 +75,18 @@ public class ExactlyOnceJdbcOutputApp implements 
StreamingApplication
     }
 
     @Override
-    protected void setStatementParameters(PreparedStatement statement, 
KeyValPair<String, Integer> tuple) throws SQLException
+    protected void setStatementParameters(PreparedStatement statement,
+        KeyValPair<String, Integer> tuple) throws SQLException
     {
       statement.setString(1, tuple.getKey());
       statement.setInt(2, tuple.getValue());
     }
   }
 
+  /**
+   * Extension of {@link UniqueCounter} that emits individual key/value pairs 
instead
+   * of map with all modified values.
+   */
   public static class UniqueCounterFlat extends UniqueCounter<String>
   {
     public final transient DefaultOutputPort<KeyValPair<String, Integer>> 
counts = new DefaultOutputPort<>();
diff --git 
a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
 
b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
index 62bfb74..5457ec5 100644
--- 
a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
+++ 
b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -76,19 +76,20 @@ public class ExactlyOnceJdbcOutputTest
     Connection con = DriverManager.getConnection(DB_URL);
     Statement stmt = con.createStatement();
 
-    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + 
JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS "
+        + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
         + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, 
"
         + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
         + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
         + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + 
JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
         + ")";
     stmt.executeUpdate(createMetaTable);
 
     String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
         + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
     stmt.executeUpdate(createTable);
-
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <commits@apex.apache.org>.

Reply via email to