Added: samza/site/learn/documentation/versioned/api/test-framework.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/api/test-framework.html?rev=1855804&view=auto
==============================================================================
--- samza/site/learn/documentation/versioned/api/test-framework.html (added)
+++ samza/site/learn/documentation/versioned/api/test-framework.html Tue Mar 19 
05:31:11 2019
@@ -0,0 +1,967 @@
+<!DOCTYPE html>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<html lang="en">
+
+<head>
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1, 
shrink-to-fit=no">
+  <meta charset="utf-8">
+  <title>Samza - Testing Samza jobs: Integration Framework</title>
+  <link rel="apple-touch-icon-precomposed" sizes="57x57" 
href="/img/favicon/apple-touch-icon-57x57.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="114x114" 
href="/img/favicon/apple-touch-icon-114x114.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="72x72" 
href="/img/favicon/apple-touch-icon-72x72.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="144x144" 
href="/img/favicon/apple-touch-icon-144x144.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="60x60" 
href="/img/favicon/apple-touch-icon-60x60.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="120x120" 
href="/img/favicon/apple-touch-icon-120x120.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="76x76" 
href="/img/favicon/apple-touch-icon-76x76.png" />
+  <link rel="apple-touch-icon-precomposed" sizes="152x152" 
href="/img/favicon/apple-touch-icon-152x152.png" />
+  <link rel="icon" type="image/png" href="/img/favicon/favicon-196x196.png" 
sizes="196x196" />
+  <link rel="icon" type="image/png" href="/img/favicon/favicon-96x96.png" 
sizes="96x96" />
+  <link rel="icon" type="image/png" href="/img/favicon/favicon-32x32.png" 
sizes="32x32" />
+  <link rel="icon" type="image/png" href="/img/favicon/favicon-16x16.png" 
sizes="16x16" />
+  <link rel="icon" type="image/png" href="/img/favicon/favicon-128.png" 
sizes="128x128" />
+  <meta name="application-name" content="https://samza.apache.org"; />
+  <meta name="msapplication-TileColor" content="#FFFFFF" />
+  <meta name="msapplication-TileImage" 
content="/img/favicon/mstile-144x144.png" />
+  <meta name="msapplication-square70x70logo" 
content="/img/favicon/mstile-70x70.png" />
+  <meta name="msapplication-square150x150logo" 
content="/img/favicon/mstile-150x150.png" />
+  <meta name="msapplication-wide310x150logo" 
content="/img/favicon/mstile-310x150.png" />
+  <meta name="msapplication-square310x310logo" 
content="/img/favicon/mstile-310x310.png" />
+  <link href="/css/ionicons.min.css" rel="stylesheet">
+  <link href="/css/google-fonts.css" rel="stylesheet">
+  <link href="/css/syntax.css" rel="stylesheet"/>
+  <link rel="stylesheet" href="/css/main.new.css" />
+</head>
+
+<body class="page">
+  <!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<div class="main-navigation" data-plugin="menu">
+  <div class="main-navigation__toggle" data-menu-closed>
+    <i class="icon ion-md-menu"></i>
+  </div>
+  <div class="main-navigation__toggle main-navigation__toggle--opened" 
data-menu-opened>
+    <i class="icon ion-md-close"></i>
+  </div>
+  <div class="main-navigation__inner">
+    <div class="main-navigation__logo">
+      <a href="/">
+        <img class="main-navigation__logo-img" src="/img/samza-logo.png" 
srcset="/img/samza-logo.png 1x, /img/[email protected] 2x"
+          alt="Samza Logo" />
+      </a>
+    </div>
+    <div class="main-navigation__items" data-menu-opened>
+      <a class="main-navigation__item" href="/">Home</a>
+      <a class="main-navigation__item" 
href="/learn/documentation/latest/core-concepts/core-concepts.html">Docs</a>
+      <a class="main-navigation__item" href="/powered-by/">Powered By</a>
+      <a class="main-navigation__item" href="/startup/download/">Downloads</a>
+      <a class="main-navigation__item" href="/blog/">Blog</a>
+      <div class="main-navigation__item main-navigation__item--group">
+        <div class="main-navigation__item-group-title">
+          Community
+          <i class="icon ion-md-arrow-dropdown"></i>
+        </div>
+        <div class="main-navigation__item-group-list">
+          <a class="main-navigation__item" 
href="/community/contact-us.html">Contact Us</a>
+          <a class="main-navigation__item" 
href="/contribute/contributors-corner.html">Contributor's Corner</a>
+          <a class="main-navigation__item" 
href="/community/committers.html">PMC Members and committers</a>
+          <a class="main-navigation__item" href="/meetups/">Talks and 
Meetups</a>
+        </div>
+      </div>
+    </div>
+  </div>
+</div>
+
+  <div class="container">
+      <div class="container__toggle">
+        <i class="icon ion-md-arrow-dropleft-circle 
container__toggle-icon"></i>
+        <i class="icon ion-md-arrow-dropright-circle container__toggle-icon 
container__toggle-icon--opened"></i>
+      </div>
+      
+      <!-- There is only one menu, but made it as a no-output collection to 
grab data only -->
+      
+        <!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
+<div class="side-navigation">
+
+  
+
+    
+
+    
+    <!-- Start Group -->
+    
+    <div class="side-navigation__group side-navigation__group--has-nested" 
data-plugin="sub-menu" 
data-sub-menu-show-class="side-navigation__group--has-nested-visible">
+    
+
+    <!-- Make menu_title, and start items group if needed -->
+    
+      <div class="side-navigation__group-title">
+        <i class="side-navigation__group-title-icon icon 
ion-md-arrow-dropdown"></i>
+        Getting Started
+      </div>
+      <div class="side-navigation__group-items " data-sub-menu >
+    
+
+    <!-- Handle sub navigation items from data -->
+    
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/startup/quick-start/latest/">QuickStart</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/startup/code-examples/latest/">Code Examples</a>
+      
+
+    
+
+    <!-- Handle sub nagivation from site collections -->
+    
+
+    <!-- Close sub nav group -->
+    
+      </div>
+    
+
+    <!-- Close menu group -->
+    </div>
+
+  
+
+    
+
+    
+    <!-- Start Group -->
+    
+    <div class="side-navigation__group side-navigation__group--has-nested" 
data-plugin="sub-menu" 
data-sub-menu-show-class="side-navigation__group--has-nested-visible">
+    
+
+    <!-- Make menu_title, and start items group if needed -->
+    
+      <div class="side-navigation__group-title">
+        <i class="side-navigation__group-title-icon icon 
ion-md-arrow-dropdown"></i>
+        Documentation
+      </div>
+      <div class="side-navigation__group-items 
side-navigation__group-has-submenus" data-sub-menu 
data-documentation="/learn/documentation/latest/">
+    
+
+    <!-- Handle sub navigation items from data -->
+    
+
+    <!-- Handle sub nagivation from site collections -->
+    
+
+    <!-- Close sub nav group -->
+    
+      </div>
+    
+
+    <!-- Close menu group -->
+    </div>
+
+  
+
+    
+
+    
+    <!-- Start Group -->
+    
+    <div class="side-navigation__group side-navigation__group--has-nested" 
data-plugin="sub-menu" 
data-sub-menu-show-class="side-navigation__group--has-nested-visible">
+    
+
+    <!-- Make menu_title, and start items group if needed -->
+    
+      <div class="side-navigation__group-title">
+        <i class="side-navigation__group-title-icon icon 
ion-md-arrow-dropdown"></i>
+        Releases
+      </div>
+      <div class="side-navigation__group-items " data-sub-menu >
+    
+
+    <!-- Handle sub navigation items from data -->
+    
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.0.0">1.0.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/0.14">0.14</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/0.13">0.13</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/0.12">0.12</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/0.11">0.11</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/0.10">0.10</a>
+      
+
+    
+
+    <!-- Handle sub nagivation from site collections -->
+    
+
+    <!-- Close sub nav group -->
+    
+      </div>
+    
+
+    <!-- Close menu group -->
+    </div>
+
+  
+
+    
+
+    
+    <!-- Start Group -->
+    
+    <div class="side-navigation__group">
+    
+
+    <!-- Make menu_title, and start items group if needed -->
+    
+      <a class="side-navigation__group-title" data-plugin="top-menu" 
data-match-active="" href="/blog/">
+        Blog
+      </a>
+    
+
+    <!-- Handle sub navigation items from data -->
+    
+
+    <!-- Handle sub nagivation from site collections -->
+    
+
+    <!-- Close sub nav group -->
+    
+
+    <!-- Close menu group -->
+    </div>
+
+  
+
+    
+
+    
+    <!-- Start Group -->
+    
+    <div class="side-navigation__group side-navigation__group--has-nested" 
data-plugin="sub-menu" 
data-sub-menu-show-class="side-navigation__group--has-nested-visible">
+    
+
+    <!-- Make menu_title, and start items group if needed -->
+    
+      <div class="side-navigation__group-title">
+        <i class="side-navigation__group-title-icon icon 
ion-md-arrow-dropdown"></i>
+        Community
+      </div>
+      <div class="side-navigation__group-items " data-sub-menu >
+    
+
+    <!-- Handle sub navigation items from data -->
+    
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/community/contact-us.html">Contact Us</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/contribute/contributors-corner.html">Contributor's Corner</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/contribute/enhancement-proposal.html">Enhancement Proposal</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/community/committers.html">PMC members & Committers</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/meetups/">Talks and Meetups</a>
+      
+
+    
+
+    <!-- Handle sub nagivation from site collections -->
+    
+
+    <!-- Close sub nav group -->
+    
+      </div>
+    
+
+    <!-- Close menu group -->
+    </div>
+
+  
+
+    
+
+    
+    <!-- Start Group -->
+    
+    <div class="side-navigation__group side-navigation__group--has-nested" 
data-plugin="sub-menu" 
data-sub-menu-show-class="side-navigation__group--has-nested-visible">
+    
+
+    <!-- Make menu_title, and start items group if needed -->
+    
+      <div class="side-navigation__group-title">
+        <i class="side-navigation__group-title-icon icon 
ion-md-arrow-dropdown"></i>
+        Case Studies
+      </div>
+      <div class="side-navigation__group-items " data-sub-menu >
+    
+
+    <!-- Handle sub navigation items from data -->
+    
+      
+        
+      <a class="side-navigation__group-item" data-match-active="exact" 
href="/case-studies/">View All</a>
+      
+        
+          <hr>
+          
+
+    
+
+    <!-- Handle sub nagivation from site collections -->
+    
+
+      
+
+      
+
+      
+
+      
+        
+          
+        
+
+        
+        
+        
+
+        
+
+        
+
+        
+
+      <a class="side-navigation__group-item" href="/case-studies/ebay" 
data-match-active="">eBay</a>
+      
+        
+          
+        
+          
+        
+          
+        
+
+        
+        
+        
+
+        
+
+        
+
+        
+
+      <a class="side-navigation__group-item" href="/case-studies/tripadvisor" 
data-match-active="">TripAdvisor</a>
+      
+        
+          
+        
+
+        
+        
+        
+
+        
+
+        
+
+        
+
+      <a class="side-navigation__group-item" href="/case-studies/slack" 
data-match-active="">Slack</a>
+      
+        
+
+        
+        
+        
+
+        
+
+        
+
+        
+
+      <a class="side-navigation__group-item" href="/case-studies/optimizely" 
data-match-active="">Optimizely</a>
+      
+        
+
+        
+        
+        
+
+        
+
+        
+
+        
+
+      <a class="side-navigation__group-item" href="/case-studies/redfin" 
data-match-active="">Redfin</a>
+      
+        
+
+        
+        
+        
+
+        
+
+        
+
+        
+
+      <a class="side-navigation__group-item" href="/case-studies/linkedin" 
data-match-active="">LinkedIn</a>
+      
+        
+          
+        
+          
+        
+          
+
+    
+
+    <!-- Close sub nav group -->
+    
+      </div>
+    
+
+    <!-- Close menu group -->
+    </div>
+
+  
+
+</div>
+
+      
+      
+      <div class="section">
+        <div class="content">
+          
+          <h2>Testing Samza jobs: Integration Framework</h2>
+          
+
+          
+
+          <div class="releases-list-container">
+
+            <span>Releases</span>
+
+            <ul class="releases-list" data-releases-list>
+              <li class="hide"><a 
href="/learn/documentation/latest/api/test-framework">latest</a></li>
+              
+              
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.14/api/test-framework">0.14</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.13/api/test-framework">0.13</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.12/api/test-framework">0.12</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.11/api/test-framework">0.11</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.10/api/test-framework">0.10</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.0/api/test-framework">1.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.9/api/test-framework">0.9</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.8/api/test-framework">0.8</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.7/api/test-framework">0.7</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/0.7.0/api/test-framework">0.7.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/-1000/api/test-framework">-1000</a></li>
+
+              
+
+            </ul>
+
+          </div>
+
+          <hr class="releases-list-divider">
+
+          
+    
+          <!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<h1 id="what-is-samzas-integration-test-framework">What is Samza&rsquo;s 
Integration Test Framework ?</h1>
+
+<ul>
+<li>  Samza provides an Integration framework which allows you to test 
applications by quickly running them against a few messages and asserting on 
expected results. This alleviates the need to set up dependencies like Kafka, 
Yarn, Zookeeper to test your Samza applications</li>
+<li>  Integration Framework can test the new StreamDSL (StreamApplication) and 
Task APIs (TaskApplication) as well as supports testing for legacy low level 
(StreamTask and AsyncStreamTask) samza jobs</li>
+</ul>
+
+<h1 id="some-prerequisite-information">Some Prerequisite Information</h1>
+
+<ol>
+<li> Your Samza job will be executed in single container mode and framework 
will set all the required configs for you to run your job (more on configs 
later)</li>
+<li> Your Samza job will read from a special kind of bounded streams 
introduced in the next section, containing finite number of messages to make 
testing feasible.</li>
+</ol>
+
+<h1 id="key-concepts">Key Concepts</h1>
+
+<h2 id="introduction-to-in-memory-system-and-streams">Introduction to In 
Memory System and Streams</h2>
+
+<ol>
+<li> With Samza 1.0 we now get the feature of using streams that are 
maintained in memory using an in memory system.</li>
+<li> These in memory streams are described by InMemoryInputDescriptor, 
InMemoryOutputDescriptor and the corresponding system is described by 
InMemorySystemDescriptors</li>
+<li> These streams are like Kafka streams but there lifecycle is maintained in 
memory which means they get initialized with your job, are available throughout 
its run and are destroyed after the test ends . </li>
+</ol>
+
+<h2 id="introduction-to-testrunner-api">Introduction to TestRunner api</h2>
+
+<ol>
+<li> Samza 1.0 introduces a new TestRunner api to set up a test for Samza job, 
add configs, configure input/output streams, run the job in testing mode</li>
+<li> TestRunner also provides utilities to consume contents of a stream once 
the test has ran successfully</li>
+<li> TestRunner does basic config setup for you by default, you have 
flexibility to change these default configs if required</li>
+<li> TestRunner supports stateless and stateful job testing. TestRunner works 
with InMemoryTables and RocksDB Tables </li>
+</ol>
+
+<h2 id="how-to-write-test">How To Write Test</h2>
+
+<p>For example, here is a StreamApplication that validates and decorates page 
views with viewer’s profile information.</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+    <span class="kd">class</span> <span 
class="nc">BadPageViewFilterApplication</span> <span 
class="kd">implements</span> <span class="n">StreamApplication</span> <span 
class="o">{</span>
+        <span class="nd">@Override</span>
+        <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">StreamApplicationDescriptor</span> <span 
class="n">appDesc</span><span class="o">)</span> <span class="o">{</span> <span 
class="err">…</span> <span class="o">}</span>
+    <span class="o">}</span>
+    
+    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span 
class="n">StreamApplication</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">StreamApplicationDescriptor</span> <span 
class="n">appDesc</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">KafkaSystemDescriptor</span> <span 
class="n">kafka</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span>
+        <span class="n">InputDescriptor</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">kafka</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
+        <span class="n">OutputDescriptor</span><span 
class="o">&lt;</span><span class="n">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageViews</span> <span 
class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span> <span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">DecoratedPageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>    
+        <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">appDesc</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">pageViewInput</span><span class="o">);</span>
+        <span class="n">pageViews</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span 
class="k">this</span><span class="o">::</span><span 
class="n">isValidPageView</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="k">this</span><span class="o">::</span><span 
class="n">addProfileInformation</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">appDesc</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">));</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+    </code></pre></figure>
+
+<p>There are 4 simple steps to write a test for your stream processing logic 
and assert on the output</p>
+
+<h2 id="step-1-construct-an-inmemorysystem">Step 1: Construct an 
InMemorySystem</h2>
+
+<p>In the example we are writing we use a Kafka system called 
&ldquo;test&rdquo;, so we will configure an equivalent in memory system (name 
should be the same as used in job) as shown below:   </p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+    <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span></code></pre></figure>
+
+<h2 id="step-2-initialize-your-input-and-output-streams">Step 2:  Initialize 
your input and output streams</h2>
+
+<ol>
+<li> TestRunner API uses a special kind of input and output streams called in 
memory streams which are easy to define and write assertions on.</li>
+<li> Data in these streams are maintained in memory hence they always use a 
NoOpSerde&lt;&gt;</li>
+<li> You need to configure all the stream that your job reads/writes to. </li>
+<li> You can obtain handle of these streams from the system we initialized in 
previous step</li>
+<li> We have two choices when we configure a stream type </li>
+</ol>
+
+<p>Input Stream described by InMemoryInputDescriptor, these streams need to be 
initialized with messages (data), since your job reads this.</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>     
+     <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">inMemory</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span></code></pre></figure>
+
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="na">    INFO</span><span 
class="o">:</span> <span class="s">Use the org.apache.samza.operators.KV as the 
message type ex: InMemoryInputDescriptor&lt;KV&lt;String,PageView&gt;&gt; as 
the message type</span>
+    <span class="err">to</span> <span class="err">use</span> <span 
class="err">key</span> <span class="err">of</span> <span class="err">the</span> 
<span class="err">KV</span> <span class="err">(String</span> <span 
class="err">here)</span> <span class="err">as</span> <span 
class="err">key</span> <span class="err">and</span> <span 
class="err">value</span> <span class="err">as</span> <span 
class="err">message</span> <span class="err">(PageView</span> <span 
class="err">here)</span> <span class="err">for</span> <span 
class="err">the</span> <span class="err">IncomingMessageEnvelope</span> <span 
class="err">in</span> <span class="err">samza</span> <span 
class="err">job,</span> <span class="err">using</span> <span 
class="err">all</span> <span class="err">the</span> <span 
class="err">other</span> <span class="err">data</span> <span 
class="err">types</span> <span class="err">will</span> <span 
class="err">result</span> <span class="err">in</span> <span 
class="err">key</span> <span class="err">
 of</span> <span class="err">the</span> <span class="err">the</span> <span 
class="err">IncomingMessageEnvelope</span> <span class="err">set</span> <span 
class="err">to</span> <span class="err">null</span> </code></pre></figure>
+
+<p>Output Stream described by InMemoryOutputDescriptor, these streams need to 
be initialized with with a partition count and are empty since your job writes 
to these streams</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span 
class="n">InMemoryOutputDescriptor</span><span class="o">&lt;</span><span 
class="n">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">outputPageViews</span> <span class="o">=</span> <span 
class="n">inMemory</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;decorated-page-views&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;())</span></code></pre></figure>
+
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="na">    Note</span><span 
class="o">:</span> <span class="s">Input streams are immutable - ie., once they 
have been created you can&#39;t modify their contents eg: by adding new 
messages&quot;All input streams are supposed to be 
bounded</span></code></pre></figure>
+
+<h2 id="step-3-create-a-testrunner">Step 3: Create a TestRunner</h2>
+
+<ol>
+<li> Initialize a TestRunner of your Samza job</li>
+<li> Configure TestRunner with input streams and mock data to it </li>
+<li> Configure TestRunner with output streams with a partition count</li>
+<li> Add any configs if necessary</li>
+<li> Run the test runner</li>
+</ol>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">List</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViews</span> <span class="o">=</span> <span 
class="n">generateData</span><span class="o">(...);</span>
+    <span class="n">TestRunner</span>
+       <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">BadPageViewFilterApplication</span><span class="o">())</span>
+       <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> 
<span class="n">pageViews</span><span class="o">)</span>
+       <span class="o">.</span><span class="na">addOutputStream</span><span 
class="o">(</span><span class="n">outputPageViews</span><span 
class="o">,</span> <span class="mi">10</span><span class="o">)</span>
+       <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1500</span><span class="o">));</span></code></pre></figure>
+
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="na">    Info</span><span 
class="o">:</span> <span class="s">Use addConfig(Map&lt;String, String&gt; 
configs) or addConfig(String key, String value) to add/modify any config in the 
TestRunner</span></code></pre></figure>
+
+<h2 id="step-4-assert-on-the-output-stream">Step-4: Assert on the output 
stream</h2>
+
+<p>You have the following choices for asserting the results of your tests</p>
+
+<ol>
+<li>You can use StreamAssert utils on your In Memory Streams to do consumption 
of all partitions</li>
+</ol>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+    <span class="c1">// Consume multi-paritioned stream, key of the map 
represents partitionId</span>
+    <span class="n">Map</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">expOutput</span><span class="o">;</span>
+    <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span> <span 
class="n">expectedOutput</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
+    <span class="c1">// Consume single paritioned stream</span>
+    <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(...),</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span></code></pre></figure>
+
+<ol>
+<li>You have the flexibility to define your custom assertions using API 
TestRunner.consumeStream() to assert on any partitions of the stream</li>
+</ol>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">Assert</span><span 
class="o">.</span><span class="na">assertEquals</span><span class="o">(</span>
+        <span class="n">TestRunner</span><span class="o">.</span><span 
class="na">consumeStream</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">)).</span><span 
class="na">get</span><span class="o">(</span><span class="mi">0</span><span 
class="o">).</span><span class="na">size</span><span class="o">(),</span><span 
class="mi">1</span>
+       <span class="o">);</span></code></pre></figure>
+
+<p>Complete Glance at the code</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="nd">@Test</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testStreamDSLApi</span><span class="o">()</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+     <span class="c1">// Generate Mock Data</span>
+     <span class="n">List</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">genrateMockInput</span><span class="o">(...);</span>
+     <span class="n">List</span><span class="o">&lt;</span><span 
class="n">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">expectedOutput</span> <span class="o">=</span> <span 
class="n">genrateMockOutput</span><span class="o">(...);</span>
+    
+     <span class="c1">// Configure System and Stream Descriptors</span>
+     <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span>
+     <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+        <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+     <span class="n">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageView</span> <span 
class="o">=</span> <span class="n">inMemory</span>
+        <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;())</span>
+     
+     <span class="c1">// Configure the TestRunner </span>
+     <span class="n">TestRunner</span>
+         <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">BadPageViewFilterApplication</span><span class="o">())</span>
+         <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> 
<span class="n">pageViews</span><span class="o">)</span>
+         <span class="o">.</span><span class="na">addOutputStream</span><span 
class="o">(</span><span class="n">outputPageView</span><span class="o">,</span> 
<span class="mi">10</span><span class="o">)</span>
+         <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1500</span><span class="o">));</span>
+    
+     <span class="c1">// Assert the results</span>
+     <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">expectedOutput</span><span class="o">,</span> <span 
class="n">outputPageView</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
+    <span class="o">}</span></code></pre></figure>
+ 
+
+<h3 id="example-for-low-level-api">Example for Low Level Api:</h3>
+
+<p>For a Low Level Task API</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span 
class="kd">implements</span> <span class="n">TaskApplication</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">TaskApplicationDescriptor</span> <span class="n">appDesc</span><span 
class="o">)</span> <span class="o">{</span>
+        <span class="c1">// Add input, output streams and tables</span>
+        <span class="n">KafkaSystemDescriptor</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">kafkaSystem</span> <span class="o">=</span> 
+            <span class="k">new</span> <span 
class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">kafka</span><span 
class="err">”</span><span class="o">)</span>
+              <span class="o">.</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="n">myZkServers</span><span class="o">)</span>
+              <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="n">myBrokers</span><span class="o">);</span>
+        <span class="n">KVSerde</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">serde</span> <span class="o">=</span> 
+            <span class="n">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;</span><span class="n">PageViewEvent</span><span 
class="o">&gt;());</span>
+        <span class="c1">// Add input, output streams and tables</span>
+        <span class="n">appDesc</span><span class="o">.</span><span 
class="na">withInputStream</span><span class="o">(</span><span 
class="n">kafkaSystem</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">pageViewEvent</span><span 
class="err">”</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">))</span>
+            <span class="o">.</span><span 
class="na">withOutputStream</span><span class="o">(</span><span 
class="n">kafkaSystem</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">goodPageViewEvent</span><span 
class="err">”</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">))</span>
+            <span class="o">.</span><span class="na">withTable</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">RocksDBTableDescriptor</span><span class="o">(</span>
+                <span class="err">“</span><span 
class="n">badPageUrlTable</span><span class="err">”</span><span 
class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">IntegerSerde</span><span 
class="o">())</span>
+            <span class="o">.</span><span 
class="na">withTaskFactory</span><span class="o">(</span><span 
class="k">new</span> <span class="n">BadPageViewTaskFactory</span><span 
class="o">());</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+    
+    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewTaskFactory</span> <span class="kd">implements</span> 
<span class="n">StreamTaskFactory</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">StreamTask</span> <span 
class="nf">createInstance</span><span class="o">()</span> <span 
class="o">{</span>
+        <span class="c1">// Add input, output streams and tables</span>
+        <span class="k">return</span> <span class="k">new</span> <span 
class="n">BadPageViewFilterTask</span><span class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+    
+     <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilterTask</span> <span class="kd">implements</span> 
<span class="n">StreamTask</span> <span class="o">{</span>
+       <span class="nd">@Override</span>
+       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">process</span><span class="o">(</span><span 
class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span 
class="o">,</span>
+                           <span class="n">MessageCollector</span> <span 
class="n">collector</span><span class="o">,</span>
+                           <span class="n">TaskCoordinator</span> <span 
class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+         <span class="c1">// process message synchronously</span>
+        <span class="o">}</span>
+     <span class="o">}</span>   
+     
+     
+     <span class="nd">@Test</span>
+     <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testBadPageViewFilterTaskApplication</span><span class="o">()</span> 
<span class="o">{</span>
+       <span class="n">List</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">badPageViews</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">generatePageViews</span><span class="o">(..));</span>
+       <span class="n">List</span><span class="o">&lt;</span><span 
class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">expectedGoodPageViews</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">generatePageViews</span><span class="o">(..));</span>
+     
+       <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kafka&quot;</span><span class="o">);</span>
+     
+       <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+          <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;pageViewEvent&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+     
+       <span class="n">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewOutput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+          <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;goodPageViewEvent&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+     
+       <span class="n">TestRunner</span>
+          <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">BadPageViewFilter</span><span class="o">())</span>
+          <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> 
<span class="n">badPageViews</span><span class="o">)</span>
+          <span class="o">.</span><span class="na">addOutputStream</span><span 
class="o">(</span><span class="n">pageViewOutput</span><span class="o">,</span> 
<span class="mi">1</span><span class="o">)</span>
+          <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">2</span><span class="o">));</span>
+     
+       <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">expectedGoodPageViews</span><span class="o">,</span> <span 
class="n">pageViewOutput</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
+     <span class="o">}</span></code></pre></figure>
+
+<p>Follow a similar approach for Legacy Low Level API, just provide the 
classname 
+(class implementing StreamTask or AsyncStreamTask) to TestRunner</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>      <span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">MultiplyByTenStreamTask</span> <span 
class="kd">implements</span> <span class="n">StreamTask</span> <span 
class="o">{</span>
+       <span class="nd">@Override</span>
+       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">process</span><span class="o">(</span><span 
class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span 
class="o">,</span> <span class="n">MessageCollector</span> <span 
class="n">collector</span><span class="o">,</span> <span 
class="n">TaskCoordinator</span> <span class="n">coordinator</span><span 
class="o">)</span>
+           <span class="kd">throws</span> <span class="n">Exception</span> 
<span class="o">{</span>
+         <span class="n">Integer</span> <span class="n">obj</span> <span 
class="o">=</span> <span class="o">(</span><span class="n">Integer</span><span 
class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span 
class="na">getMessage</span><span class="o">();</span>
+         <span class="n">collector</span><span class="o">.</span><span 
class="na">send</span><span class="o">(</span><span class="k">new</span> <span 
class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span 
class="k">new</span> <span class="n">SystemStream</span><span 
class="o">(</span><span class="s">&quot;test&quot;</span><span 
class="o">,</span> <span class="s">&quot;output&quot;</span><span 
class="o">),</span>
+             <span class="n">envelope</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">(),</span> <span 
class="n">envelope</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">(),</span> <span class="n">obj</span> 
<span class="o">*</span> <span class="mi">10</span><span class="o">));</span>
+       <span class="o">}</span>
+      <span class="o">}</span>
+       
+      <span class="nd">@Test</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testLowLevelApi</span><span class="o">()</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+        <span class="n">List</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">inputList</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span class="mi">1</span><span 
class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span 
class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span 
class="o">,</span> <span class="mi">5</span><span class="o">);</span>
+        <span class="n">List</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">outputList</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span class="mi">10</span><span 
class="o">,</span> <span class="mi">20</span><span class="o">,</span> <span 
class="mi">30</span><span class="o">,</span> <span class="mi">40</span><span 
class="o">,</span> <span class="mi">50</span><span class="o">);</span>
+       
+        <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span>
+       
+        <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">numInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+           <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;input&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="n">Integer</span><span 
class="o">&gt;());</span>
+       
+        <span class="n">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">numOutput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+           <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;output&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="n">Integer</span><span 
class="o">&gt;());</span>
+       
+        <span class="n">TestRunner</span>
+           <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="n">MyStreamTestTask</span><span 
class="o">.</span><span class="na">class</span><span class="o">)</span>
+           <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">numInput</span><span class="o">,</span> <span 
class="n">inputList</span><span class="o">)</span>
+           <span class="o">.</span><span 
class="na">addOutputStream</span><span class="o">(</span><span 
class="n">numOutput</span><span class="o">,</span> <span 
class="mi">1</span><span class="o">)</span>
+           <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">1</span><span class="o">));</span>
+       
+        <span class="n">Assert</span><span class="o">.</span><span 
class="na">assertThat</span><span class="o">(</span><span 
class="n">TestRunner</span><span class="o">.</span><span 
class="na">consumeStream</span><span class="o">(</span><span 
class="n">imod</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">)).</span><span 
class="na">get</span><span class="o">(</span><span class="mi">0</span><span 
class="o">),</span>
+           <span class="n">IsIterableContainingInOrder</span><span 
class="o">.</span><span class="na">contains</span><span class="o">(</span><span 
class="n">outputList</span><span class="o">.</span><span 
class="na">toArray</span><span class="o">()));;</span>
+      <span class="o">}</span></code></pre></figure>
+
+<h2 id="stateful-testing">Stateful Testing</h2>
+
+<ol>
+<li>There is no additional config/changes required for TestRunner apis for 
testing samza jobs using StreamApplication or TaskApplication APIs</li>
+<li>Legacy task api only supports RocksDbTable and needs following configs to 
be added to TestRunner. 
+For example if your job is using a RocksDbTable named &ldquo;my-store&rdquo; 
with key and msg serde of String type</li>
+</ol>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">Map</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">config</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;();</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;stores.my-store.factory&quot;</span><span class="o">,</span> 
<span 
class="s">&quot;org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory&quot;</span><span
 class="o">);</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">out</span><span class="o">(</span><span 
class="s">&quot;serializers.registry.string.class&quot;</span><span 
class="o">,</span> <span 
class="s">&quot;org.apache.samza.serializers.StringSerdeFactory&quot;</span><span
 class="o">);</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;stores.my-store.key.serde&quot;</span><span class="o">,</span> 
<span class="s">&quot;string&quot;</span><span class="o">);</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;stores.my-store.msg.serde&quot;</span><span class="o">,</span> 
<span class="s">&quot;string&quot;</span><span class="o">);</span>
+    
+    <span class="n">TestRunner</span>
+        <span class="o">.</span><span class="na">of</span><span 
class="o">(...)</span>
+        <span class="o">.</span><span class="na">addConfig</span><span 
class="o">(</span><span class="n">config</span><span class="o">)</span>
+        <span class="o">...</span>
+        </code></pre></figure>
+
+           
+        </div>
+      </div>
+
+  </div>
+  
+
+  <!-- footer starts here -->
+
+  <!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<footer>
+  <div class="footer-inner">
+    <div class="side-by-side">
+      <div>
+        <div class="footer__heading">Learn More</div>
+        <div class="footer__items">
+          <a class="footer__item" href="/meetups/">Meetups</a>
+          <a class="footer__item" href="/blog/">Blog</a>
+          <a class="footer__item" 
href="/learn/documentation/latest/introduction/background.html">About</a>
+        </div>
+      </div>
+      <div>
+        <div class="footer__heading">Community</div>
+        <div class="footer__items">
+          <a class="footer__item" href="/community/contact-us.html">Contact 
Us</a>
+          <a class="footer__item" 
href="/contribute/contributors-corner.html">Contributors' Corner</a>
+          <a class="footer__item" href="/community/committers.html">PMC 
members and committers</a>
+          <a class="footer__item" href="/powered-by/">Powered By</a>
+        </div>
+      </div>
+
+      <div>
+        <div class="quick-links">
+          <a class="quick-link" href="/startup/download" target="_blank">
+            <i class="icon ion-md-download"></i>
+          </a>
+          <a class="quick-link" 
href="https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tree"; 
target="_blank">
+            <i class="icon ion-md-code"></i>
+          </a>
+          <a class="quick-link" href="https://twitter.com/samzastream"; 
target="_blank">
+            <i class="icon ion-logo-twitter"></i>
+          </a>
+        </div>
+
+        <p>
+          <script>document.write(new Date().getFullYear());</script> &copy; 
samza.apache.org</p>
+      </div>
+
+    </div>
+  </div>
+
+</footer>
+
+
+<script>
+  var tryFile = function (url, cb) {
+    var myRequest = new Request(url);
+    fetch(myRequest).then((response, cb) => {
+      console.log(response.status); // returns 200
+      cb(response.status != 404);
+    });
+  }
+
+  tryFile(window.location.pathname, function (status) {
+    // do something with the status
+    console.log(status);
+  });
+</script>
+
+
+<!-- Google Analytics -->
+<script>
+  (function (i, s, o, g, r, a, m) {
+    i['GoogleAnalyticsObject'] = r; i[r] = i[r] || function () {
+      (i[r].q = i[r].q || []).push(arguments)
+    }, i[r].l = 1 * new Date(); a = s.createElement(o),
+      m = s.getElementsByTagName(o)[0]; a.async = 1; a.src = g; 
m.parentNode.insertBefore(a, m)
+  })(window, document, 'script', '//www.google-analytics.com/analytics.js', 
'ga');
+
+  ga('create', 'UA-43122768-1', 'apache.org');
+  ga('send', 'pageview');
+
+</script>
+<script src="/js/main.new.js"></script>
+
+</body>
+
+</html>
\ No newline at end of file

Modified: samza/site/learn/documentation/versioned/connectors/eventhubs.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/connectors/eventhubs.html?rev=1855804&r1=1855803&r2=1855804&view=diff
==============================================================================
--- samza/site/learn/documentation/versioned/connectors/eventhubs.html 
(original)
+++ samza/site/learn/documentation/versioned/connectors/eventhubs.html Tue Mar 
19 05:31:11 2019
@@ -587,7 +587,7 @@
 
 <p>The Samza EventHubs connector provides access to <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features";>Azure
 EventHubs</a>, Microsoft’s data streaming service on Azure. An eventhub is 
similar to a Kafka topic and can have multiple partitions with producers and 
consumers. Each message produced or consumed from an event hub is an instance 
of <a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data";>EventData</a>.
 </p>
 
-<p>The <a href="https://github.com/apache/samza-hello-samza";>hello-samza</a> 
project includes an <a 
href="../../../tutorials/versioned/samza-event-hubs-standalone.md">example</a> 
of reading and writing to EventHubs.</p>
+<p>The <a href="https://github.com/apache/samza-hello-samza";>hello-samza</a> 
project includes an <a 
href="../../../tutorials/versioned/samza-event-hubs-standalone.html">example</a>
 of reading and writing to EventHubs.</p>
 
 <h3 id="concepts">Concepts</h3>
 

Modified: samza/site/learn/documentation/versioned/connectors/hdfs.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/connectors/hdfs.html?rev=1855804&r1=1855803&r2=1855804&view=diff
==============================================================================
--- samza/site/learn/documentation/versioned/connectors/hdfs.html (original)
+++ samza/site/learn/documentation/versioned/connectors/hdfs.html Tue Mar 19 
05:31:11 2019
@@ -612,12 +612,10 @@ To interact with HDFS, Samza requires yo
 
 <h4 id="defining-streams">Defining streams</h4>
 
-<p>Samza uses the notion of a <em>system</em> to describe any I/O source it 
interacts with. To consume from HDFS, you should create a new system that 
points to - <code>HdfsSystemFactory</code>. You can then associate multiple 
streams with this <em>system</em>. Each stream should have a <em>physical 
name</em>, which should be set to the name of the directory on HDFS.</p>
+<p>In Samza high level API, you can use <code>HdfsSystemDescriptor</code> to 
create a HDFS system. The stream name should be set to the name of the 
directory on HDFS.</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.hdfs.samza.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.system.hdfs.HdfsSystemFactory</span>
-
-<span class="na">streams.hdfs-clickstream.samza.system</span><span 
class="o">=</span><span class="s">hdfs</span>
-<span class="na">streams.hdfs-clickstream.samza.physical.name</span><span 
class="o">=</span><span 
class="s">hdfs:/data/clickstream/2016/09/11</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">);</span>
+<span class="n">HdfsInputDescriptor</span> <span class="n">hid</span> <span 
class="o">=</span> <span class="n">hsd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;/data/clickstream/2016/09/11&quot;</span><span 
class="o">);</span></code></pre></figure>
 
 <p>The above example defines a stream called <code>hdfs-clickstream</code> 
that reads data from the <code>/data/clickstream/2016/09/11</code> directory. 
</p>
 
@@ -625,8 +623,9 @@ To interact with HDFS, Samza requires yo
 
 <p>If you only want to consume from files that match a certain pattern, you 
can configure a whitelist. Likewise, you can also blacklist consuming from 
certain files. When both are specified, the <em>whitelist</em> selects the 
files to be filtered and the <em>blacklist</em> is later applied on its 
results. </p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.hdfs.partitioner.defaultPartitioner.whitelist</span><span 
class="o">=</span><span class="s">.*avro</span>
-<span 
class="na">systems.hdfs.partitioner.defaultPartitioner.blacklist</span><span 
class="o">=</span><span class="s">somefile.avro</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withConsumerWhiteList</span><span class="o">(</span><span 
class="s">&quot;.*avro&quot;</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withConsumerBlackList</span><span class="o">(</span><span 
class="s">&quot;somefile.avro&quot;</span><span 
class="o">);</span></code></pre></figure>
 
 <h3 id="producing-to-hdfs">Producing to HDFS</h3>
 
@@ -634,27 +633,27 @@ To interact with HDFS, Samza requires yo
 
 <p>Samza allows writing your output results to HDFS in AVRO format. You can 
either use avro&rsquo;s GenericRecords or have Samza automatically infer the 
schema for your object using reflection. </p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="c"># set the SystemFactory 
implementation to instantiate HdfsSystemProducer aliased to 
&#39;hdfs&#39;</span>
-<span class="na">systems.hdfs.samza.factory</span><span 
class="o">=</span><span 
class="s">org.apache.samza.system.hdfs.HdfsSystemFactory</span>
-<span class="na">systems.hdfs.producer.hdfs.writer.class</span><span 
class="o">=</span><span 
class="s">org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter</span></code></pre></figure>
-
-<p>If your output is non-avro, you can describe its format by implementing 
your own serializer.</p>
-
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.hdfs.producer.hdfs.writer.class</span><span 
class="o">=</span><span 
class="s">org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter</span>
-<span class="na">serializers.registry.my-serde-name.class</span><span 
class="o">=</span><span class="s">MySerdeFactory</span>
-<span class="na">systems.hdfs.samza.msg.serde</span><span 
class="o">=</span><span class="s">my-serde-name</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withWriterClassName</span><span class="o">(</span><span 
class="n">AvroDataFileHdfsWriter</span><span class="o">.</span><span 
class="na">class</span><span class="o">.</span><span 
class="na">getName</span><span class="o">());</span></code></pre></figure>
+
+<p>If your output is non-avro, use <code>TextSequenceFileHdfsWriter</code>.</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withWriterClassName</span><span class="o">(</span><span 
class="n">TextSequenceFileHdfsWriter</span><span class="o">.</span><span 
class="na">class</span><span class="o">.</span><span 
class="na">getName</span><span class="o">());</span></code></pre></figure>
 
 <h4 id="output-directory-structure">Output directory structure</h4>
 
 <p>Samza allows you to control the base HDFS directory to write your output. 
You can also organize the output into sub-directories depending on the time 
your application ran, by configuring a date-formatter. </p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.hdfs.producer.hdfs.base.output.dir</span><span 
class="o">=</span><span class="s">/user/me/analytics/clickstream_data</span>
-<span 
class="na">systems.hdfs.producer.hdfs.bucketer.date.path.format</span><span 
class="o">=</span><span class="s">yyyy_MM_dd</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withOutputBaseDir</span><span class="o">(</span><span 
class="s">&quot;/user/me/analytics/clickstream_data&quot;</span><span 
class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withDatePathFormat</span><span class="o">(</span><span 
class="s">&quot;yyyy_MM_dd&quot;</span><span 
class="o">);</span></code></pre></figure>
 
 <p>You can configure the maximum size of each file or the maximum number of 
records per-file. Once either limits have been reached, Samza will create a new 
file.</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.hdfs.producer.hdfs.write.batch.size.bytes</span><span 
class="o">=</span><span class="s">134217728</span>
-<span 
class="na">systems.hdfs.producer.hdfs.write.batch.size.records</span><span 
class="o">=</span><span class="s">10000</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withWriteBatchSizeBytes</span><span class="o">(</span><span 
class="mi">134217728</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withWriteBatchSizeRecords</span><span class="o">(</span><span 
class="mi">10000</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="security">Security</h3>
 

Modified: samza/site/learn/documentation/versioned/connectors/kinesis.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/connectors/kinesis.html?rev=1855804&r1=1855803&r2=1855804&view=diff
==============================================================================
--- samza/site/learn/documentation/versioned/connectors/kinesis.html (original)
+++ samza/site/learn/documentation/versioned/connectors/kinesis.html Tue Mar 19 
05:31:11 2019
@@ -600,21 +600,15 @@ wraps the Record into a <a href="https:/
 
 <h4 id="basic-configuration">Basic Configuration</h4>
 
-<p>Here is the required configuration for consuming messages from Kinesis. </p>
+<p>Here is the required configuration for consuming messages from Kinesis, 
through <code>KinesisSystemDescriptor</code> and 
<code>KinesisInputDescriptor</code>. </p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="c">// Define a Kinesis system 
factory with your identifier. eg: kinesis-system</span>
-<span class="na">systems.kinesis-system.samza.factory</span><span 
class="o">=</span><span 
class="s">org.apache.samza.system.kinesis.KinesisSystemFactory</span>
-
-<span class="c">// Kinesis consumer works with only 
AllSspToSingleTaskGrouperFactory</span>
-<span class="na">job.systemstreampartition.grouper.factory</span><span 
class="o">=</span><span 
class="s">org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory</span>
-
-<span class="c">// Define your streams</span>
-<span class="na">task.inputs</span><span class="o">=</span><span 
class="s">kinesis-system.input0</span>
-
-<span class="c">// Define required properties for your streams</span>
-<span class="na">systems.kinesis-system.streams.input0.aws.region</span><span 
class="o">=</span><span class="s">YOUR-STREAM-REGION</span>
-<span 
class="na">systems.kinesis-system.streams.input0.aws.accessKey</span><span 
class="o">=</span><span class="s">YOUR-ACCESS_KEY</span>
-<span 
class="na">sensitive.systems.kinesis-system.streams.input0.aws.secretKey</span><span
 class="o">=</span><span class="s">YOUR-SECRET-KEY</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">KinesisSystemDescriptor</span> 
<span class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">KinesisSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kinesis&quot;</span><span class="o">);</span>
+    
+<span class="n">KinesisInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span> <span 
class="n">kid</span> <span class="o">=</span> 
+    <span class="n">ksd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;STREAM-NAME&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="kt">byte</span><span 
class="o">[]&gt;())</span>
+          <span class="o">.</span><span class="na">withRegion</span><span 
class="o">(</span><span class="s">&quot;STREAM-REGION&quot;</span><span 
class="o">)</span>
+          <span class="o">.</span><span class="na">withAccessKey</span><span 
class="o">(</span><span class="s">&quot;YOUR-ACCESS_KEY&quot;</span><span 
class="o">)</span>
+          <span class="o">.</span><span class="na">withSecretKey</span><span 
class="o">(</span><span class="s">&quot;YOUR-SECRET-KEY&quot;</span><span 
class="o">);</span></code></pre></figure>
 
 <h4 id="coordination">Coordination</h4>
 
@@ -627,9 +621,11 @@ set your <code>grouper</code> configurat
 
 <p>Each Kinesis stream in a given AWS <a 
href="https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html";>region</a>
 can be accessed by providing an <a 
href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys";>access
 key</a>. An Access key consists of two parts: an access key ID (for example, 
<code>AKIAIOSFODNN7EXAMPLE</code>) and a secret access key (for example, 
<code>wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY</code>) which you can use to 
send programmatic requests to AWS. </p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.kinesis-system.streams.input0.aws.region</span><span 
class="o">=</span><span class="s">YOUR-STREAM-REGION</span>
-<span 
class="na">systems.kinesis-system.streams.input0.aws.accessKey</span><span 
class="o">=</span><span class="s">YOUR-ACCESS_KEY</span>
-<span 
class="na">sensitive.systems.kinesis-system.streams.input0.aws.secretKey</span><span
 class="o">=</span><span class="s">YOUR-SECRET-KEY</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span 
class="n">KinesisInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span> <span 
class="n">kid</span> <span class="o">=</span> 
+    <span class="n">ksd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;STREAM-NAME&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="kt">byte</span><span 
class="o">[]&gt;())</span>
+          <span class="o">.</span><span class="na">withRegion</span><span 
class="o">(</span><span class="s">&quot;STREAM-REGION&quot;</span><span 
class="o">)</span>
+          <span class="o">.</span><span class="na">withAccessKey</span><span 
class="o">(</span><span class="s">&quot;YOUR-ACCESS_KEY&quot;</span><span 
class="o">)</span>
+          <span class="o">.</span><span class="na">withSecretKey</span><span 
class="o">(</span><span class="s">&quot;YOUR-SECRET-KEY&quot;</span><span 
class="o">);</span></code></pre></figure>
 
 <h3 id="advanced-configuration">Advanced Configuration</h3>
 
@@ -637,25 +633,40 @@ set your <code>grouper</code> configurat
 
 <p>Samza Kinesis Connector uses the <a 
href="https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl";>Kinesis
 Client Library</a>
 (KCL) to access the Kinesis data streams. You can set any <a 
href="https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java";>KCL
 Configuration</a>
-for a stream by configuring it with the 
<strong>systems.system-name.streams.stream-name.aws.kcl.</strong>* prefix.</p>
+for a stream by configuring it through <code>KinesisInputDescriptor</code>.</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span 
class="n">KinesisInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span> <span 
class="n">kid</span> <span class="o">=</span> <span class="o">...</span>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM</span><span
 class="o">=</span><span class="s">CONFIG-VALUE</span></code></pre></figure>
+<span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;;</span>
+<span class="n">kclConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;CONFIG-PARAM&quot;</span><span class="o">,</span> <span 
class="s">&quot;CONFIG-VALUE&quot;</span><span class="o">);</span>
+
+<span class="n">kid</span><span class="o">.</span><span 
class="na">withKCLConfig</span><span class="o">(</span><span 
class="n">kclConfig</span><span class="o">);</span></code></pre></figure>
 
 <p>As an example, the below configuration is equivalent to invoking 
<code>kclClient#WithTableName(myTable)</code> on the KCL instance.</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.system-name.streams.stream-name.aws.kcl.TableName</span><span
 class="o">=</span><span class="s">myTable</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span 
class="n">KinesisInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span> <span 
class="n">kid</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;;</span>
+<span class="n">kclConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;TableName&quot;</span><span class="o">,</span> <span 
class="s">&quot;myTable&quot;</span><span class="o">);</span>
+
+<span class="n">kid</span><span class="o">.</span><span 
class="na">withKCLConfig</span><span class="o">(</span><span 
class="n">kclConfig</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="aws-client-configs">AWS Client configs</h4>
 
 <p>Samza allows you to specify any <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html";>AWS
 client configs</a> to connect to your Kinesis instance.
-You can configure any <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html";>AWS
 client configuration</a> with the 
<code>systems.your-system-name.aws.clientConfig.*</code> prefix.</p>
+You can configure any <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html";>AWS
 client configuration</a> through <code>KinesisSystemDescriptor</code>.</p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">Map</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">awsConfig</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;;</span>
+<span class="n">awsConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;CONFIG-PARAM&quot;</span><span class="o">,</span> <span 
class="s">&quot;CONFIG-VALUE&quot;</span><span class="o">);</span>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.system-name.aws.clientConfig.CONFIG-PARAM</span><span 
class="o">=</span><span class="s">CONFIG-VALUE</span></code></pre></figure>
+<span class="n">KinesisSystemDescriptor</span> <span class="n">sd</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">KinesisSystemDescriptor</span><span class="o">(</span><span 
class="n">systemName</span><span class="o">)</span>
+                                          <span class="o">.</span><span 
class="na">withAWSConfig</span><span class="o">(</span><span 
class="n">awsConfig</span><span class="o">);</span></code></pre></figure>
 
-<p>As an example, to set the <em>proxy host</em> and <em>proxy port</em> to be 
used by the Kinesis Client:</p>
+<p>Through <code>KinesisSystemDescriptor</code> you can also set the <em>proxy 
host</em> and <em>proxy port</em> to be used by the Kinesis Client:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">systems.system-name.aws.clientConfig.ProxyHost</span><span 
class="o">=</span><span class="s">my-proxy-host.com</span>
-<span class="na">systems.system-name.aws.clientConfig.ProxyPort</span><span 
class="o">=</span><span class="s">my-proxy-port</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">KinesisSystemDescriptor</span> 
<span class="n">sd</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">KinesisSystemDescriptor</span><span class="o">(</span><span 
class="n">systemName</span><span class="o">)</span>
+                                          <span class="o">.</span><span 
class="na">withProxyHost</span><span class="o">(</span><span 
class="s">&quot;YOUR-PROXY-HOST&quot;</span><span class="o">)</span>
+                                          <span class="o">.</span><span 
class="na">withProxyPort</span><span class="o">(</span><span 
class="n">YOUR</span><span class="o">-</span><span class="n">PROXY</span><span 
class="o">-</span><span class="n">PORT</span><span 
class="o">);</span></code></pre></figure>
 
 <h3 id="resetting-offsets">Resetting Offsets</h3>
 
@@ -663,12 +674,30 @@ You can configure any <a href="http://do
 These checkpoints are stored and managed by the KCL library internally. You 
can reset the checkpoints by configuring a different name for the DynamoDB 
table. </p>
 
 <figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="c">// change the TableName to 
a unique name to reset checkpoints.</span>
-<span 
class="na">systems.kinesis-system.streams.input0.aws.kcl.TableName</span><span 
class="o">=</span><span class="s">my-app-table-name</span></code></pre></figure>
+<span 
class="na">systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName</span><span
 class="o">=</span><span 
class="s">my-app-table-name</span></code></pre></figure>
+
+<p>Or through <code>KinesisInputDescriptor</code></p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span 
class="n">KinesisInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span> <span 
class="n">kid</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;;</span>
+<span class="n">kclConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;TableName&quot;</span><span class="o">,</span> <span 
class="s">&quot;my-new-app-table-name&quot;</span><span class="o">);</span>
+
+<span class="n">kid</span><span class="o">.</span><span 
class="na">withKCLConfig</span><span class="o">(</span><span 
class="n">kclConfig</span><span class="o">);</span></code></pre></figure>
 
 <p>When you reset checkpoints, you can configure your job to start consuming 
from either the earliest or latest offset in the stream.  </p>
 
 <figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="c">// set the starting 
position to either TRIM_HORIZON (oldest) or LATEST (latest)</span>
-<span 
class="na">systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream</span><span
 class="o">=</span><span class="s">LATEST</span></code></pre></figure>
+<span 
class="na">systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream</span><span
 class="o">=</span><span class="s">LATEST</span></code></pre></figure>
+
+<p>Or through <code>KinesisInputDescriptor</code></p>
+
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span 
class="n">KinesisInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span> <span 
class="n">kid</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;;</span>
+<span class="n">kclConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;InitialPositionInStream&quot;</span><span class="o">,</span> 
<span class="s">&quot;LATEST&quot;</span><span class="o">);</span>
+
+<span class="n">kid</span><span class="o">.</span><span 
class="na">withKCLConfig</span><span class="o">(</span><span 
class="n">kclConfig</span><span class="o">);</span></code></pre></figure>
 
 <p>Alternately, if you want to start from a particular offset in the Kinesis 
stream, you can login to the <a 
href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html";>AWS
 console</a> and edit the offsets in your DynamoDB Table.
 By default, the table-name has the following format: &ldquo;&lt;job 
name&gt;-&lt;job id&gt;-&lt;kinesis stream&gt;&rdquo;.</p>


Reply via email to