Added: samza/site/learn/documentation/versioned/api/test-framework.html URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/api/test-framework.html?rev=1855804&view=auto ============================================================================== --- samza/site/learn/documentation/versioned/api/test-framework.html (added) +++ samza/site/learn/documentation/versioned/api/test-framework.html Tue Mar 19 05:31:11 2019 @@ -0,0 +1,967 @@ +<!DOCTYPE html> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<html lang="en"> + +<head> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"> + <meta charset="utf-8"> + <title>Samza - Testing Samza jobs: Integration Framework</title> + <link rel="apple-touch-icon-precomposed" sizes="57x57" href="/img/favicon/apple-touch-icon-57x57.png" /> + <link rel="apple-touch-icon-precomposed" sizes="114x114" href="/img/favicon/apple-touch-icon-114x114.png" /> + <link rel="apple-touch-icon-precomposed" sizes="72x72" href="/img/favicon/apple-touch-icon-72x72.png" /> + <link rel="apple-touch-icon-precomposed" sizes="144x144" href="/img/favicon/apple-touch-icon-144x144.png" /> + <link rel="apple-touch-icon-precomposed" sizes="60x60" href="/img/favicon/apple-touch-icon-60x60.png" /> + <link rel="apple-touch-icon-precomposed" sizes="120x120" href="/img/favicon/apple-touch-icon-120x120.png" /> + <link rel="apple-touch-icon-precomposed" sizes="76x76" href="/img/favicon/apple-touch-icon-76x76.png" /> + <link rel="apple-touch-icon-precomposed" sizes="152x152" href="/img/favicon/apple-touch-icon-152x152.png" /> + <link rel="icon" type="image/png" href="/img/favicon/favicon-196x196.png" sizes="196x196" /> + <link rel="icon" type="image/png" href="/img/favicon/favicon-96x96.png" sizes="96x96" /> + <link rel="icon" type="image/png" href="/img/favicon/favicon-32x32.png" sizes="32x32" /> + <link rel="icon" type="image/png" href="/img/favicon/favicon-16x16.png" sizes="16x16" /> + <link rel="icon" type="image/png" href="/img/favicon/favicon-128.png" sizes="128x128" /> + <meta name="application-name" content="https://samza.apache.org" /> + <meta name="msapplication-TileColor" content="#FFFFFF" /> + <meta name="msapplication-TileImage" content="/img/favicon/mstile-144x144.png" /> + <meta name="msapplication-square70x70logo" content="/img/favicon/mstile-70x70.png" /> + <meta name="msapplication-square150x150logo" content="/img/favicon/mstile-150x150.png" /> + <meta name="msapplication-wide310x150logo" content="/img/favicon/mstile-310x150.png" /> + <meta name="msapplication-square310x310logo" content="/img/favicon/mstile-310x310.png" /> + <link href="/css/ionicons.min.css" rel="stylesheet"> + <link href="/css/google-fonts.css" rel="stylesheet"> + <link href="/css/syntax.css" rel="stylesheet"/> + <link rel="stylesheet" href="/css/main.new.css" /> +</head> + +<body class="page"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<div class="main-navigation" data-plugin="menu"> + <div class="main-navigation__toggle" data-menu-closed> + <i class="icon ion-md-menu"></i> + </div> + <div class="main-navigation__toggle main-navigation__toggle--opened" data-menu-opened> + <i class="icon ion-md-close"></i> + </div> + <div class="main-navigation__inner"> + <div class="main-navigation__logo"> + <a href="/"> + <img class="main-navigation__logo-img" src="/img/samza-logo.png" srcset="/img/samza-logo.png 1x, /img/[email protected] 2x" + alt="Samza Logo" /> + </a> + </div> + <div class="main-navigation__items" data-menu-opened> + <a class="main-navigation__item" href="/">Home</a> + <a class="main-navigation__item" href="/learn/documentation/latest/core-concepts/core-concepts.html">Docs</a> + <a class="main-navigation__item" href="/powered-by/">Powered By</a> + <a class="main-navigation__item" href="/startup/download/">Downloads</a> + <a class="main-navigation__item" href="/blog/">Blog</a> + <div class="main-navigation__item main-navigation__item--group"> + <div class="main-navigation__item-group-title"> + Community + <i class="icon ion-md-arrow-dropdown"></i> + </div> + <div class="main-navigation__item-group-list"> + <a class="main-navigation__item" href="/community/contact-us.html">Contact Us</a> + <a class="main-navigation__item" href="/contribute/contributors-corner.html">Contributor's Corner</a> + <a class="main-navigation__item" href="/community/committers.html">PMC Members and committers</a> + <a class="main-navigation__item" href="/meetups/">Talks and Meetups</a> + </div> + </div> + </div> + </div> +</div> + + <div class="container"> + <div class="container__toggle"> + <i class="icon ion-md-arrow-dropleft-circle container__toggle-icon"></i> + <i class="icon ion-md-arrow-dropright-circle container__toggle-icon container__toggle-icon--opened"></i> + </div> + + <!-- There is only one menu, but made it as a no-output collection to grab data only --> + + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +<div class="side-navigation"> + + + + + + + <!-- Start Group --> + + <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> + + + <!-- Make menu_title, and start items group if needed --> + + <div class="side-navigation__group-title"> + <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> + Getting Started + </div> + <div class="side-navigation__group-items " data-sub-menu > + + + <!-- Handle sub navigation items from data --> + + + + <a class="side-navigation__group-item" data-match-active="" href="/startup/quick-start/latest/">QuickStart</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/startup/code-examples/latest/">Code Examples</a> + + + + + <!-- Handle sub nagivation from site collections --> + + + <!-- Close sub nav group --> + + </div> + + + <!-- Close menu group --> + </div> + + + + + + + <!-- Start Group --> + + <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> + + + <!-- Make menu_title, and start items group if needed --> + + <div class="side-navigation__group-title"> + <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> + Documentation + </div> + <div class="side-navigation__group-items side-navigation__group-has-submenus" data-sub-menu data-documentation="/learn/documentation/latest/"> + + + <!-- Handle sub navigation items from data --> + + + <!-- Handle sub nagivation from site collections --> + + + <!-- Close sub nav group --> + + </div> + + + <!-- Close menu group --> + </div> + + + + + + + <!-- Start Group --> + + <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> + + + <!-- Make menu_title, and start items group if needed --> + + <div class="side-navigation__group-title"> + <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> + Releases + </div> + <div class="side-navigation__group-items " data-sub-menu > + + + <!-- Handle sub navigation items from data --> + + + + <a class="side-navigation__group-item" data-match-active="" href="/releases/1.0.0">1.0.0</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/releases/0.14">0.14</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/releases/0.13">0.13</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/releases/0.12">0.12</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/releases/0.11">0.11</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/releases/0.10">0.10</a> + + + + + <!-- Handle sub nagivation from site collections --> + + + <!-- Close sub nav group --> + + </div> + + + <!-- Close menu group --> + </div> + + + + + + + <!-- Start Group --> + + <div class="side-navigation__group"> + + + <!-- Make menu_title, and start items group if needed --> + + <a class="side-navigation__group-title" data-plugin="top-menu" data-match-active="" href="/blog/"> + Blog + </a> + + + <!-- Handle sub navigation items from data --> + + + <!-- Handle sub nagivation from site collections --> + + + <!-- Close sub nav group --> + + + <!-- Close menu group --> + </div> + + + + + + + <!-- Start Group --> + + <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> + + + <!-- Make menu_title, and start items group if needed --> + + <div class="side-navigation__group-title"> + <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> + Community + </div> + <div class="side-navigation__group-items " data-sub-menu > + + + <!-- Handle sub navigation items from data --> + + + + <a class="side-navigation__group-item" data-match-active="" href="/community/contact-us.html">Contact Us</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/contribute/contributors-corner.html">Contributor's Corner</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/contribute/enhancement-proposal.html">Enhancement Proposal</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/community/committers.html">PMC members & Committers</a> + + + <a class="side-navigation__group-item" data-match-active="" href="/meetups/">Talks and Meetups</a> + + + + + <!-- Handle sub nagivation from site collections --> + + + <!-- Close sub nav group --> + + </div> + + + <!-- Close menu group --> + </div> + + + + + + + <!-- Start Group --> + + <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> + + + <!-- Make menu_title, and start items group if needed --> + + <div class="side-navigation__group-title"> + <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> + Case Studies + </div> + <div class="side-navigation__group-items " data-sub-menu > + + + <!-- Handle sub navigation items from data --> + + + + <a class="side-navigation__group-item" data-match-active="exact" href="/case-studies/">View All</a> + + + <hr> + + + + + <!-- Handle sub nagivation from site collections --> + + + + + + + + + + + + + + + + + + + + + + + + <a class="side-navigation__group-item" href="/case-studies/ebay" data-match-active="">eBay</a> + + + + + + + + + + + + + + + + + + + + <a class="side-navigation__group-item" href="/case-studies/tripadvisor" data-match-active="">TripAdvisor</a> + + + + + + + + + + + + + + + + <a class="side-navigation__group-item" href="/case-studies/slack" data-match-active="">Slack</a> + + + + + + + + + + + + + + <a class="side-navigation__group-item" href="/case-studies/optimizely" data-match-active="">Optimizely</a> + + + + + + + + + + + + + + <a class="side-navigation__group-item" href="/case-studies/redfin" data-match-active="">Redfin</a> + + + + + + + + + + + + + + <a class="side-navigation__group-item" href="/case-studies/linkedin" data-match-active="">LinkedIn</a> + + + + + + + + + + + <!-- Close sub nav group --> + + </div> + + + <!-- Close menu group --> + </div> + + + +</div> + + + + <div class="section"> + <div class="content"> + + <h2>Testing Samza jobs: Integration Framework</h2> + + + + + <div class="releases-list-container"> + + <span>Releases</span> + + <ul class="releases-list" data-releases-list> + <li class="hide"><a href="/learn/documentation/latest/api/test-framework">latest</a></li> + + + + + <li class="hide"><a href="/learn/documentation/0.14/api/test-framework">0.14</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.13/api/test-framework">0.13</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.12/api/test-framework">0.12</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.11/api/test-framework">0.11</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.10/api/test-framework">0.10</a></li> + + + + <li class="hide"><a href="/learn/documentation/1.0/api/test-framework">1.0</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.9/api/test-framework">0.9</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.8/api/test-framework">0.8</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.7/api/test-framework">0.7</a></li> + + + + <li class="hide"><a href="/learn/documentation/0.7.0/api/test-framework">0.7.0</a></li> + + + + <li class="hide"><a href="/learn/documentation/-1000/api/test-framework">-1000</a></li> + + + + </ul> + + </div> + + <hr class="releases-list-divider"> + + + + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<h1 id="what-is-samzas-integration-test-framework">What is Samza’s Integration Test Framework ?</h1> + +<ul> +<li> Samza provides an Integration framework which allows you to test applications by quickly running them against a few messages and asserting on expected results. This alleviates the need to set up dependencies like Kafka, Yarn, Zookeeper to test your Samza applications</li> +<li> Integration Framework can test the new StreamDSL (StreamApplication) and Task APIs (TaskApplication) as well as supports testing for legacy low level (StreamTask and AsyncStreamTask) samza jobs</li> +</ul> + +<h1 id="some-prerequisite-information">Some Prerequisite Information</h1> + +<ol> +<li> Your Samza job will be executed in single container mode and framework will set all the required configs for you to run your job (more on configs later)</li> +<li> Your Samza job will read from a special kind of bounded streams introduced in the next section, containing finite number of messages to make testing feasible.</li> +</ol> + +<h1 id="key-concepts">Key Concepts</h1> + +<h2 id="introduction-to-in-memory-system-and-streams">Introduction to In Memory System and Streams</h2> + +<ol> +<li> With Samza 1.0 we now get the feature of using streams that are maintained in memory using an in memory system.</li> +<li> These in memory streams are described by InMemoryInputDescriptor, InMemoryOutputDescriptor and the corresponding system is described by InMemorySystemDescriptors</li> +<li> These streams are like Kafka streams but there lifecycle is maintained in memory which means they get initialized with your job, are available throughout its run and are destroyed after the test ends . </li> +</ol> + +<h2 id="introduction-to-testrunner-api">Introduction to TestRunner api</h2> + +<ol> +<li> Samza 1.0 introduces a new TestRunner api to set up a test for Samza job, add configs, configure input/output streams, run the job in testing mode</li> +<li> TestRunner also provides utilities to consume contents of a stream once the test has ran successfully</li> +<li> TestRunner does basic config setup for you by default, you have flexibility to change these default configs if required</li> +<li> TestRunner supports stateless and stateful job testing. TestRunner works with InMemoryTables and RocksDB Tables </li> +</ol> + +<h2 id="how-to-write-test">How To Write Test</h2> + +<p>For example, here is a StreamApplication that validates and decorates page views with viewerâs profile information.</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> + <span class="kd">class</span> <span class="nc">BadPageViewFilterApplication</span> <span class="kd">implements</span> <span class="n">StreamApplication</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">StreamApplicationDescriptor</span> <span class="n">appDesc</span><span class="o">)</span> <span class="o">{</span> <span class="err">â¦</span> <span class="o">}</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span class="n">StreamApplication</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">StreamApplicationDescriptor</span> <span class="n">appDesc</span><span class="o">)</span> <span class="o">{</span> + <span class="n">KafkaSystemDescriptor</span> <span class="n">kafka</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"test"</span><span class="o">);</span> + <span class="n">InputDescriptor</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span> + <span class="n">OutputDescriptor</span><span class="o"><</span><span class="n">DecoratedPageView</span><span class="o">></span> <span class="n">outputPageViews</span> <span class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span> <span class="err">â</span><span class="n">decorated</span><span class="o">-</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span> + <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">appDesc</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">pageViewInput</span><span class="o">);</span> + <span class="n">pageViews</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">isValidPageView</span><span class="o">)</span> + <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">addProfileInformation</span><span class="o">)</span> + <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">appDesc</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputPageViews</span><span class="o">));</span> + <span class="o">}</span> + <span class="o">}</span> + </code></pre></figure> + +<p>There are 4 simple steps to write a test for your stream processing logic and assert on the output</p> + +<h2 id="step-1-construct-an-inmemorysystem">Step 1: Construct an InMemorySystem</h2> + +<p>In the example we are writing we use a Kafka system called “test”, so we will configure an equivalent in memory system (name should be the same as used in job) as shown below: </p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> + <span class="n">InMemorySystemDescriptor</span> <span class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span class="s">"test"</span><span class="o">);</span></code></pre></figure> + +<h2 id="step-2-initialize-your-input-and-output-streams">Step 2: Initialize your input and output streams</h2> + +<ol> +<li> TestRunner API uses a special kind of input and output streams called in memory streams which are easy to define and write assertions on.</li> +<li> Data in these streams are maintained in memory hence they always use a NoOpSerde<></li> +<li> You need to configure all the stream that your job reads/writes to. </li> +<li> You can obtain handle of these streams from the system we initialized in previous step</li> +<li> We have two choices when we configure a stream type </li> +</ol> + +<p>Input Stream described by InMemoryInputDescriptor, these streams need to be initialized with messages (data), since your job reads this.</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> + <span class="n">InMemoryInputDescriptor</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">inMemory</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><>());</span></code></pre></figure> + +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na"> INFO</span><span class="o">:</span> <span class="s">Use the org.apache.samza.operators.KV as the message type ex: InMemoryInputDescriptor<KV<String,PageView>> as the message type</span> + <span class="err">to</span> <span class="err">use</span> <span class="err">key</span> <span class="err">of</span> <span class="err">the</span> <span class="err">KV</span> <span class="err">(String</span> <span class="err">here)</span> <span class="err">as</span> <span class="err">key</span> <span class="err">and</span> <span class="err">value</span> <span class="err">as</span> <span class="err">message</span> <span class="err">(PageView</span> <span class="err">here)</span> <span class="err">for</span> <span class="err">the</span> <span class="err">IncomingMessageEnvelope</span> <span class="err">in</span> <span class="err">samza</span> <span class="err">job,</span> <span class="err">using</span> <span class="err">all</span> <span class="err">the</span> <span class="err">other</span> <span class="err">data</span> <span class="err">types</span> <span class="err">will</span> <span class="err">result</span> <span class="err">in</span> <span class="err">key</span> <span class="err"> of</span> <span class="err">the</span> <span class="err">the</span> <span class="err">IncomingMessageEnvelope</span> <span class="err">set</span> <span class="err">to</span> <span class="err">null</span> </code></pre></figure> + +<p>Output Stream described by InMemoryOutputDescriptor, these streams need to be initialized with with a partition count and are empty since your job writes to these streams</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">InMemoryOutputDescriptor</span><span class="o"><</span><span class="n">DecoratedPageView</span><span class="o">></span> <span class="n">outputPageViews</span> <span class="o">=</span> <span class="n">inMemory</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"decorated-page-views"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><>())</span></code></pre></figure> + +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na"> Note</span><span class="o">:</span> <span class="s">Input streams are immutable - ie., once they have been created you can't modify their contents eg: by adding new messages"All input streams are supposed to be bounded</span></code></pre></figure> + +<h2 id="step-3-create-a-testrunner">Step 3: Create a TestRunner</h2> + +<ol> +<li> Initialize a TestRunner of your Samza job</li> +<li> Configure TestRunner with input streams and mock data to it </li> +<li> Configure TestRunner with output streams with a partition count</li> +<li> Add any configs if necessary</li> +<li> Run the test runner</li> +</ol> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">List</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">generateData</span><span class="o">(...);</span> + <span class="n">TestRunner</span> + <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">BadPageViewFilterApplication</span><span class="o">())</span> + <span class="o">.</span><span class="na">addInputStream</span><span class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> <span class="n">pageViews</span><span class="o">)</span> + <span class="o">.</span><span class="na">addOutputStream</span><span class="o">(</span><span class="n">outputPageViews</span><span class="o">,</span> <span class="mi">10</span><span class="o">)</span> + <span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1500</span><span class="o">));</span></code></pre></figure> + +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na"> Info</span><span class="o">:</span> <span class="s">Use addConfig(Map<String, String> configs) or addConfig(String key, String value) to add/modify any config in the TestRunner</span></code></pre></figure> + +<h2 id="step-4-assert-on-the-output-stream">Step-4: Assert on the output stream</h2> + +<p>You have the following choices for asserting the results of your tests</p> + +<ol> +<li>You can use StreamAssert utils on your In Memory Streams to do consumption of all partitions</li> +</ol> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> + <span class="c1">// Consume multi-paritioned stream, key of the map represents partitionId</span> + <span class="n">Map</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">></span> <span class="n">expOutput</span><span class="o">;</span> + <span class="n">StreamAssert</span><span class="o">.</span><span class="na">containsInOrder</span><span class="o">(</span><span class="n">outputPageViews</span><span class="o">,</span> <span class="n">expectedOutput</span><span class="o">,</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1000</span><span class="o">));</span> + <span class="c1">// Consume single paritioned stream</span> + <span class="n">StreamAssert</span><span class="o">.</span><span class="na">containsInOrder</span><span class="o">(</span><span class="n">outputPageViews</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(...),</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1000</span><span class="o">));</span></code></pre></figure> + +<ol> +<li>You have the flexibility to define your custom assertions using API TestRunner.consumeStream() to assert on any partitions of the stream</li> +</ol> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span> + <span class="n">TestRunner</span><span class="o">.</span><span class="na">consumeStream</span><span class="o">(</span><span class="n">outputPageViews</span><span class="o">,</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1000</span><span class="o">)).</span><span class="na">get</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">size</span><span class="o">(),</span><span class="mi">1</span> + <span class="o">);</span></code></pre></figure> + +<p>Complete Glance at the code</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="nd">@Test</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">testStreamDSLApi</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="c1">// Generate Mock Data</span> + <span class="n">List</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">genrateMockInput</span><span class="o">(...);</span> + <span class="n">List</span><span class="o"><</span><span class="n">DecoratedPageView</span><span class="o">></span> <span class="n">expectedOutput</span> <span class="o">=</span> <span class="n">genrateMockOutput</span><span class="o">(...);</span> + + <span class="c1">// Configure System and Stream Descriptors</span> + <span class="n">InMemorySystemDescriptor</span> <span class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span class="s">"test"</span><span class="o">);</span> + <span class="n">InMemoryInputDescriptor</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">inMemory</span> + <span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><>());</span> + <span class="n">InMemoryOutputDescriptor</span><span class="o"><</span><span class="n">DecoratedPageView</span><span class="o">></span> <span class="n">outputPageView</span> <span class="o">=</span> <span class="n">inMemory</span> + <span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">decorated</span><span class="o">-</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><>())</span> + + <span class="c1">// Configure the TestRunner </span> + <span class="n">TestRunner</span> + <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">BadPageViewFilterApplication</span><span class="o">())</span> + <span class="o">.</span><span class="na">addInputStream</span><span class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> <span class="n">pageViews</span><span class="o">)</span> + <span class="o">.</span><span class="na">addOutputStream</span><span class="o">(</span><span class="n">outputPageView</span><span class="o">,</span> <span class="mi">10</span><span class="o">)</span> + <span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1500</span><span class="o">));</span> + + <span class="c1">// Assert the results</span> + <span class="n">StreamAssert</span><span class="o">.</span><span class="na">containsInOrder</span><span class="o">(</span><span class="n">expectedOutput</span><span class="o">,</span> <span class="n">outputPageView</span><span class="o">,</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1000</span><span class="o">));</span> + <span class="o">}</span></code></pre></figure> + + +<h3 id="example-for-low-level-api">Example for Low Level Api:</h3> + +<p>For a Low Level Task API</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span class="n">TaskApplication</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">TaskApplicationDescriptor</span> <span class="n">appDesc</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// Add input, output streams and tables</span> + <span class="n">KafkaSystemDescriptor</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">PageViewEvent</span><span class="o">></span> <span class="n">kafkaSystem</span> <span class="o">=</span> + <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">kafka</span><span class="err">â</span><span class="o">)</span> + <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="n">myZkServers</span><span class="o">)</span> + <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="n">myBrokers</span><span class="o">);</span> + <span class="n">KVSerde</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">PageViewEvent</span><span class="o">></span> <span class="n">serde</span> <span class="o">=</span> + <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><</span><span class="n">PageViewEvent</span><span class="o">>());</span> + <span class="c1">// Add input, output streams and tables</span> + <span class="n">appDesc</span><span class="o">.</span><span class="na">withInputStream</span><span class="o">(</span><span class="n">kafkaSystem</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">pageViewEvent</span><span class="err">â</span><span class="o">,</span> <span class="n">serde</span><span class="o">))</span> + <span class="o">.</span><span class="na">withOutputStream</span><span class="o">(</span><span class="n">kafkaSystem</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="err">â</span><span class="n">goodPageViewEvent</span><span class="err">â</span><span class="o">,</span> <span class="n">serde</span><span class="o">))</span> + <span class="o">.</span><span class="na">withTable</span><span class="o">(</span><span class="k">new</span> <span class="n">RocksDBTableDescriptor</span><span class="o">(</span> + <span class="err">â</span><span class="n">badPageUrlTable</span><span class="err">â</span><span class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">())</span> + <span class="o">.</span><span class="na">withTaskFactory</span><span class="o">(</span><span class="k">new</span> <span class="n">BadPageViewTaskFactory</span><span class="o">());</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewTaskFactory</span> <span class="kd">implements</span> <span class="n">StreamTaskFactory</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">StreamTask</span> <span class="nf">createInstance</span><span class="o">()</span> <span class="o">{</span> + <span class="c1">// Add input, output streams and tables</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">BadPageViewFilterTask</span><span class="o">();</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewFilterTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> + <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> + <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// process message synchronously</span> + <span class="o">}</span> + <span class="o">}</span> + + + <span class="nd">@Test</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">testBadPageViewFilterTaskApplication</span><span class="o">()</span> <span class="o">{</span> + <span class="n">List</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">badPageViews</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">generatePageViews</span><span class="o">(..));</span> + <span class="n">List</span><span class="o"><</span><span class="n">Profile</span><span class="o">></span> <span class="n">expectedGoodPageViews</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">generatePageViews</span><span class="o">(..));</span> + + <span class="n">InMemorySystemDescriptor</span> <span class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">);</span> + + <span class="n">InMemoryInputDescriptor</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">inMemory</span> + <span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"pageViewEvent"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><>());</span> + + <span class="n">InMemoryOutputDescriptor</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewOutput</span> <span class="o">=</span> <span class="n">inMemory</span> + <span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"goodPageViewEvent"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><>());</span> + + <span class="n">TestRunner</span> + <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">BadPageViewFilter</span><span class="o">())</span> + <span class="o">.</span><span class="na">addInputStream</span><span class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> <span class="n">badPageViews</span><span class="o">)</span> + <span class="o">.</span><span class="na">addOutputStream</span><span class="o">(</span><span class="n">pageViewOutput</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> + <span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">2</span><span class="o">));</span> + + <span class="n">StreamAssert</span><span class="o">.</span><span class="na">containsInOrder</span><span class="o">(</span><span class="n">expectedGoodPageViews</span><span class="o">,</span> <span class="n">pageViewOutput</span><span class="o">,</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1000</span><span class="o">));</span> + <span class="o">}</span></code></pre></figure> + +<p>Follow a similar approach for Legacy Low Level API, just provide the classname +(class implementing StreamTask or AsyncStreamTask) to TestRunner</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">MultiplyByTenStreamTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> + <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="n">Integer</span> <span class="n">obj</span> <span class="o">=</span> <span class="o">(</span><span class="n">Integer</span><span class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getMessage</span><span class="o">();</span> + <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span class="k">new</span> <span class="n">SystemStream</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="s">"output"</span><span class="o">),</span> + <span class="n">envelope</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="n">obj</span> <span class="o">*</span> <span class="mi">10</span><span class="o">));</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="nd">@Test</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">testLowLevelApi</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">inputList</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span> + <span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">outputList</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="mi">20</span><span class="o">,</span> <span class="mi">30</span><span class="o">,</span> <span class="mi">40</span><span class="o">,</span> <span class="mi">50</span><span class="o">);</span> + + <span class="n">InMemorySystemDescriptor</span> <span class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span class="s">"test"</span><span class="o">);</span> + + <span class="n">InMemoryInputDescriptor</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">numInput</span> <span class="o">=</span> <span class="n">inMemory</span> + <span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"input"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="n">Integer</span><span class="o">>());</span> + + <span class="n">InMemoryOutputDescriptor</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">numOutput</span> <span class="o">=</span> <span class="n">inMemory</span> + <span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"output"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="n">Integer</span><span class="o">>());</span> + + <span class="n">TestRunner</span> + <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">MyStreamTestTask</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> + <span class="o">.</span><span class="na">addInputStream</span><span class="o">(</span><span class="n">numInput</span><span class="o">,</span> <span class="n">inputList</span><span class="o">)</span> + <span class="o">.</span><span class="na">addOutputStream</span><span class="o">(</span><span class="n">numOutput</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> + <span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span> + + <span class="n">Assert</span><span class="o">.</span><span class="na">assertThat</span><span class="o">(</span><span class="n">TestRunner</span><span class="o">.</span><span class="na">consumeStream</span><span class="o">(</span><span class="n">imod</span><span class="o">,</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">1000</span><span class="o">)).</span><span class="na">get</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> + <span class="n">IsIterableContainingInOrder</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="n">outputList</span><span class="o">.</span><span class="na">toArray</span><span class="o">()));;</span> + <span class="o">}</span></code></pre></figure> + +<h2 id="stateful-testing">Stateful Testing</h2> + +<ol> +<li>There is no additional config/changes required for TestRunner apis for testing samza jobs using StreamApplication or TaskApplication APIs</li> +<li>Legacy task api only supports RocksDbTable and needs following configs to be added to TestRunner. +For example if your job is using a RocksDbTable named “my-store” with key and msg serde of String type</li> +</ol> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>();</span> + <span class="n">config</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"stores.my-store.factory"</span><span class="o">,</span> <span class="s">"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"</span><span class="o">);</span> + <span class="n">config</span><span class="o">.</span><span class="na">out</span><span class="o">(</span><span class="s">"serializers.registry.string.class"</span><span class="o">,</span> <span class="s">"org.apache.samza.serializers.StringSerdeFactory"</span><span class="o">);</span> + <span class="n">config</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"stores.my-store.key.serde"</span><span class="o">,</span> <span class="s">"string"</span><span class="o">);</span> + <span class="n">config</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"stores.my-store.msg.serde"</span><span class="o">,</span> <span class="s">"string"</span><span class="o">);</span> + + <span class="n">TestRunner</span> + <span class="o">.</span><span class="na">of</span><span class="o">(...)</span> + <span class="o">.</span><span class="na">addConfig</span><span class="o">(</span><span class="n">config</span><span class="o">)</span> + <span class="o">...</span> + </code></pre></figure> + + + </div> + </div> + + </div> + + + <!-- footer starts here --> + + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<footer> + <div class="footer-inner"> + <div class="side-by-side"> + <div> + <div class="footer__heading">Learn More</div> + <div class="footer__items"> + <a class="footer__item" href="/meetups/">Meetups</a> + <a class="footer__item" href="/blog/">Blog</a> + <a class="footer__item" href="/learn/documentation/latest/introduction/background.html">About</a> + </div> + </div> + <div> + <div class="footer__heading">Community</div> + <div class="footer__items"> + <a class="footer__item" href="/community/contact-us.html">Contact Us</a> + <a class="footer__item" href="/contribute/contributors-corner.html">Contributors' Corner</a> + <a class="footer__item" href="/community/committers.html">PMC members and committers</a> + <a class="footer__item" href="/powered-by/">Powered By</a> + </div> + </div> + + <div> + <div class="quick-links"> + <a class="quick-link" href="/startup/download" target="_blank"> + <i class="icon ion-md-download"></i> + </a> + <a class="quick-link" href="https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tree" target="_blank"> + <i class="icon ion-md-code"></i> + </a> + <a class="quick-link" href="https://twitter.com/samzastream" target="_blank"> + <i class="icon ion-logo-twitter"></i> + </a> + </div> + + <p> + <script>document.write(new Date().getFullYear());</script> © samza.apache.org</p> + </div> + + </div> + </div> + +</footer> + + +<script> + var tryFile = function (url, cb) { + var myRequest = new Request(url); + fetch(myRequest).then((response, cb) => { + console.log(response.status); // returns 200 + cb(response.status != 404); + }); + } + + tryFile(window.location.pathname, function (status) { + // do something with the status + console.log(status); + }); +</script> + + +<!-- Google Analytics --> +<script> + (function (i, s, o, g, r, a, m) { + i['GoogleAnalyticsObject'] = r; i[r] = i[r] || function () { + (i[r].q = i[r].q || []).push(arguments) + }, i[r].l = 1 * new Date(); a = s.createElement(o), + m = s.getElementsByTagName(o)[0]; a.async = 1; a.src = g; m.parentNode.insertBefore(a, m) + })(window, document, 'script', '//www.google-analytics.com/analytics.js', 'ga'); + + ga('create', 'UA-43122768-1', 'apache.org'); + ga('send', 'pageview'); + +</script> +<script src="/js/main.new.js"></script> + +</body> + +</html> \ No newline at end of file
Modified: samza/site/learn/documentation/versioned/connectors/eventhubs.html URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/connectors/eventhubs.html?rev=1855804&r1=1855803&r2=1855804&view=diff ============================================================================== --- samza/site/learn/documentation/versioned/connectors/eventhubs.html (original) +++ samza/site/learn/documentation/versioned/connectors/eventhubs.html Tue Mar 19 05:31:11 2019 @@ -587,7 +587,7 @@ <p>The Samza EventHubs connector provides access to <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features">Azure EventHubs</a>, Microsoftâs data streaming service on Azure. An eventhub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data">EventData</a>. </p> -<p>The <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project includes an <a href="../../../tutorials/versioned/samza-event-hubs-standalone.md">example</a> of reading and writing to EventHubs.</p> +<p>The <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project includes an <a href="../../../tutorials/versioned/samza-event-hubs-standalone.html">example</a> of reading and writing to EventHubs.</p> <h3 id="concepts">Concepts</h3> Modified: samza/site/learn/documentation/versioned/connectors/hdfs.html URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/connectors/hdfs.html?rev=1855804&r1=1855803&r2=1855804&view=diff ============================================================================== --- samza/site/learn/documentation/versioned/connectors/hdfs.html (original) +++ samza/site/learn/documentation/versioned/connectors/hdfs.html Tue Mar 19 05:31:11 2019 @@ -612,12 +612,10 @@ To interact with HDFS, Samza requires yo <h4 id="defining-streams">Defining streams</h4> -<p>Samza uses the notion of a <em>system</em> to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - <code>HdfsSystemFactory</code>. You can then associate multiple streams with this <em>system</em>. Each stream should have a <em>physical name</em>, which should be set to the name of the directory on HDFS.</p> +<p>In Samza high level API, you can use <code>HdfsSystemDescriptor</code> to create a HDFS system. The stream name should be set to the name of the directory on HDFS.</p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.hdfs.samza.factory</span><span class="o">=</span><span class="s">org.apache.samza.system.hdfs.HdfsSystemFactory</span> - -<span class="na">streams.hdfs-clickstream.samza.system</span><span class="o">=</span><span class="s">hdfs</span> -<span class="na">streams.hdfs-clickstream.samza.physical.name</span><span class="o">=</span><span class="s">hdfs:/data/clickstream/2016/09/11</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">);</span> +<span class="n">HdfsInputDescriptor</span> <span class="n">hid</span> <span class="o">=</span> <span class="n">hsd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"/data/clickstream/2016/09/11"</span><span class="o">);</span></code></pre></figure> <p>The above example defines a stream called <code>hdfs-clickstream</code> that reads data from the <code>/data/clickstream/2016/09/11</code> directory. </p> @@ -625,8 +623,9 @@ To interact with HDFS, Samza requires yo <p>If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the <em>whitelist</em> selects the files to be filtered and the <em>blacklist</em> is later applied on its results. </p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.hdfs.partitioner.defaultPartitioner.whitelist</span><span class="o">=</span><span class="s">.*avro</span> -<span class="na">systems.hdfs.partitioner.defaultPartitioner.blacklist</span><span class="o">=</span><span class="s">somefile.avro</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withConsumerWhiteList</span><span class="o">(</span><span class="s">".*avro"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withConsumerBlackList</span><span class="o">(</span><span class="s">"somefile.avro"</span><span class="o">);</span></code></pre></figure> <h3 id="producing-to-hdfs">Producing to HDFS</h3> @@ -634,27 +633,27 @@ To interact with HDFS, Samza requires yo <p>Samza allows writing your output results to HDFS in AVRO format. You can either use avro’s GenericRecords or have Samza automatically infer the schema for your object using reflection. </p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs'</span> -<span class="na">systems.hdfs.samza.factory</span><span class="o">=</span><span class="s">org.apache.samza.system.hdfs.HdfsSystemFactory</span> -<span class="na">systems.hdfs.producer.hdfs.writer.class</span><span class="o">=</span><span class="s">org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter</span></code></pre></figure> - -<p>If your output is non-avro, you can describe its format by implementing your own serializer.</p> - -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.hdfs.producer.hdfs.writer.class</span><span class="o">=</span><span class="s">org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter</span> -<span class="na">serializers.registry.my-serde-name.class</span><span class="o">=</span><span class="s">MySerdeFactory</span> -<span class="na">systems.hdfs.samza.msg.serde</span><span class="o">=</span><span class="s">my-serde-name</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withWriterClassName</span><span class="o">(</span><span class="n">AvroDataFileHdfsWriter</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span></code></pre></figure> + +<p>If your output is non-avro, use <code>TextSequenceFileHdfsWriter</code>.</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withWriterClassName</span><span class="o">(</span><span class="n">TextSequenceFileHdfsWriter</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span></code></pre></figure> <h4 id="output-directory-structure">Output directory structure</h4> <p>Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter. </p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.hdfs.producer.hdfs.base.output.dir</span><span class="o">=</span><span class="s">/user/me/analytics/clickstream_data</span> -<span class="na">systems.hdfs.producer.hdfs.bucketer.date.path.format</span><span class="o">=</span><span class="s">yyyy_MM_dd</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withOutputBaseDir</span><span class="o">(</span><span class="s">"/user/me/analytics/clickstream_data"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withDatePathFormat</span><span class="o">(</span><span class="s">"yyyy_MM_dd"</span><span class="o">);</span></code></pre></figure> <p>You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file.</p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.hdfs.producer.hdfs.write.batch.size.bytes</span><span class="o">=</span><span class="s">134217728</span> -<span class="na">systems.hdfs.producer.hdfs.write.batch.size.records</span><span class="o">=</span><span class="s">10000</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withWriteBatchSizeBytes</span><span class="o">(</span><span class="mi">134217728</span><span class="o">)</span> + <span class="o">.</span><span class="na">withWriteBatchSizeRecords</span><span class="o">(</span><span class="mi">10000</span><span class="o">);</span></code></pre></figure> <h3 id="security">Security</h3> Modified: samza/site/learn/documentation/versioned/connectors/kinesis.html URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/versioned/connectors/kinesis.html?rev=1855804&r1=1855803&r2=1855804&view=diff ============================================================================== --- samza/site/learn/documentation/versioned/connectors/kinesis.html (original) +++ samza/site/learn/documentation/versioned/connectors/kinesis.html Tue Mar 19 05:31:11 2019 @@ -600,21 +600,15 @@ wraps the Record into a <a href="https:/ <h4 id="basic-configuration">Basic Configuration</h4> -<p>Here is the required configuration for consuming messages from Kinesis. </p> +<p>Here is the required configuration for consuming messages from Kinesis, through <code>KinesisSystemDescriptor</code> and <code>KinesisInputDescriptor</code>. </p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c">// Define a Kinesis system factory with your identifier. eg: kinesis-system</span> -<span class="na">systems.kinesis-system.samza.factory</span><span class="o">=</span><span class="s">org.apache.samza.system.kinesis.KinesisSystemFactory</span> - -<span class="c">// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory</span> -<span class="na">job.systemstreampartition.grouper.factory</span><span class="o">=</span><span class="s">org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory</span> - -<span class="c">// Define your streams</span> -<span class="na">task.inputs</span><span class="o">=</span><span class="s">kinesis-system.input0</span> - -<span class="c">// Define required properties for your streams</span> -<span class="na">systems.kinesis-system.streams.input0.aws.region</span><span class="o">=</span><span class="s">YOUR-STREAM-REGION</span> -<span class="na">systems.kinesis-system.streams.input0.aws.accessKey</span><span class="o">=</span><span class="s">YOUR-ACCESS_KEY</span> -<span class="na">sensitive.systems.kinesis-system.streams.input0.aws.secretKey</span><span class="o">=</span><span class="s">YOUR-SECRET-KEY</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisSystemDescriptor</span> <span class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisSystemDescriptor</span><span class="o">(</span><span class="s">"kinesis"</span><span class="o">);</span> + +<span class="n">KinesisInputDescriptor</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span> <span class="n">kid</span> <span class="o">=</span> + <span class="n">ksd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"STREAM-NAME"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="kt">byte</span><span class="o">[]>())</span> + <span class="o">.</span><span class="na">withRegion</span><span class="o">(</span><span class="s">"STREAM-REGION"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withAccessKey</span><span class="o">(</span><span class="s">"YOUR-ACCESS_KEY"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withSecretKey</span><span class="o">(</span><span class="s">"YOUR-SECRET-KEY"</span><span class="o">);</span></code></pre></figure> <h4 id="coordination">Coordination</h4> @@ -627,9 +621,11 @@ set your <code>grouper</code> configurat <p>Each Kinesis stream in a given AWS <a href="https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html">region</a> can be accessed by providing an <a href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys">access key</a>. An Access key consists of two parts: an access key ID (for example, <code>AKIAIOSFODNN7EXAMPLE</code>) and a secret access key (for example, <code>wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY</code>) which you can use to send programmatic requests to AWS. </p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.kinesis-system.streams.input0.aws.region</span><span class="o">=</span><span class="s">YOUR-STREAM-REGION</span> -<span class="na">systems.kinesis-system.streams.input0.aws.accessKey</span><span class="o">=</span><span class="s">YOUR-ACCESS_KEY</span> -<span class="na">sensitive.systems.kinesis-system.streams.input0.aws.secretKey</span><span class="o">=</span><span class="s">YOUR-SECRET-KEY</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisInputDescriptor</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span> <span class="n">kid</span> <span class="o">=</span> + <span class="n">ksd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"STREAM-NAME"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="kt">byte</span><span class="o">[]>())</span> + <span class="o">.</span><span class="na">withRegion</span><span class="o">(</span><span class="s">"STREAM-REGION"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withAccessKey</span><span class="o">(</span><span class="s">"YOUR-ACCESS_KEY"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withSecretKey</span><span class="o">(</span><span class="s">"YOUR-SECRET-KEY"</span><span class="o">);</span></code></pre></figure> <h3 id="advanced-configuration">Advanced Configuration</h3> @@ -637,25 +633,40 @@ set your <code>grouper</code> configurat <p>Samza Kinesis Connector uses the <a href="https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl">Kinesis Client Library</a> (KCL) to access the Kinesis data streams. You can set any <a href="https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java">KCL Configuration</a> -for a stream by configuring it with the <strong>systems.system-name.streams.stream-name.aws.kcl.</strong>* prefix.</p> +for a stream by configuring it through <code>KinesisInputDescriptor</code>.</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisInputDescriptor</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span> <span class="n">kid</span> <span class="o">=</span> <span class="o">...</span> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM</span><span class="o">=</span><span class="s">CONFIG-VALUE</span></code></pre></figure> +<span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>;</span> +<span class="n">kclConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"CONFIG-PARAM"</span><span class="o">,</span> <span class="s">"CONFIG-VALUE"</span><span class="o">);</span> + +<span class="n">kid</span><span class="o">.</span><span class="na">withKCLConfig</span><span class="o">(</span><span class="n">kclConfig</span><span class="o">);</span></code></pre></figure> <p>As an example, the below configuration is equivalent to invoking <code>kclClient#WithTableName(myTable)</code> on the KCL instance.</p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.system-name.streams.stream-name.aws.kcl.TableName</span><span class="o">=</span><span class="s">myTable</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisInputDescriptor</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span> <span class="n">kid</span> <span class="o">=</span> <span class="o">...</span> + +<span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>;</span> +<span class="n">kclConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"TableName"</span><span class="o">,</span> <span class="s">"myTable"</span><span class="o">);</span> + +<span class="n">kid</span><span class="o">.</span><span class="na">withKCLConfig</span><span class="o">(</span><span class="n">kclConfig</span><span class="o">);</span></code></pre></figure> <h4 id="aws-client-configs">AWS Client configs</h4> <p>Samza allows you to specify any <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">AWS client configs</a> to connect to your Kinesis instance. -You can configure any <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">AWS client configuration</a> with the <code>systems.your-system-name.aws.clientConfig.*</code> prefix.</p> +You can configure any <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">AWS client configuration</a> through <code>KinesisSystemDescriptor</code>.</p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">awsConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>;</span> +<span class="n">awsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"CONFIG-PARAM"</span><span class="o">,</span> <span class="s">"CONFIG-VALUE"</span><span class="o">);</span> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.system-name.aws.clientConfig.CONFIG-PARAM</span><span class="o">=</span><span class="s">CONFIG-VALUE</span></code></pre></figure> +<span class="n">KinesisSystemDescriptor</span> <span class="n">sd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisSystemDescriptor</span><span class="o">(</span><span class="n">systemName</span><span class="o">)</span> + <span class="o">.</span><span class="na">withAWSConfig</span><span class="o">(</span><span class="n">awsConfig</span><span class="o">);</span></code></pre></figure> -<p>As an example, to set the <em>proxy host</em> and <em>proxy port</em> to be used by the Kinesis Client:</p> +<p>Through <code>KinesisSystemDescriptor</code> you can also set the <em>proxy host</em> and <em>proxy port</em> to be used by the Kinesis Client:</p> -<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.system-name.aws.clientConfig.ProxyHost</span><span class="o">=</span><span class="s">my-proxy-host.com</span> -<span class="na">systems.system-name.aws.clientConfig.ProxyPort</span><span class="o">=</span><span class="s">my-proxy-port</span></code></pre></figure> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisSystemDescriptor</span> <span class="n">sd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisSystemDescriptor</span><span class="o">(</span><span class="n">systemName</span><span class="o">)</span> + <span class="o">.</span><span class="na">withProxyHost</span><span class="o">(</span><span class="s">"YOUR-PROXY-HOST"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withProxyPort</span><span class="o">(</span><span class="n">YOUR</span><span class="o">-</span><span class="n">PROXY</span><span class="o">-</span><span class="n">PORT</span><span class="o">);</span></code></pre></figure> <h3 id="resetting-offsets">Resetting Offsets</h3> @@ -663,12 +674,30 @@ You can configure any <a href="http://do These checkpoints are stored and managed by the KCL library internally. You can reset the checkpoints by configuring a different name for the DynamoDB table. </p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c">// change the TableName to a unique name to reset checkpoints.</span> -<span class="na">systems.kinesis-system.streams.input0.aws.kcl.TableName</span><span class="o">=</span><span class="s">my-app-table-name</span></code></pre></figure> +<span class="na">systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName</span><span class="o">=</span><span class="s">my-app-table-name</span></code></pre></figure> + +<p>Or through <code>KinesisInputDescriptor</code></p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisInputDescriptor</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span> <span class="n">kid</span> <span class="o">=</span> <span class="o">...</span> + +<span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>;</span> +<span class="n">kclConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"TableName"</span><span class="o">,</span> <span class="s">"my-new-app-table-name"</span><span class="o">);</span> + +<span class="n">kid</span><span class="o">.</span><span class="na">withKCLConfig</span><span class="o">(</span><span class="n">kclConfig</span><span class="o">);</span></code></pre></figure> <p>When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream. </p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c">// set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)</span> -<span class="na">systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream</span><span class="o">=</span><span class="s">LATEST</span></code></pre></figure> +<span class="na">systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream</span><span class="o">=</span><span class="s">LATEST</span></code></pre></figure> + +<p>Or through <code>KinesisInputDescriptor</code></p> + +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KinesisInputDescriptor</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span> <span class="n">kid</span> <span class="o">=</span> <span class="o">...</span> + +<span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">kclConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>;</span> +<span class="n">kclConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"InitialPositionInStream"</span><span class="o">,</span> <span class="s">"LATEST"</span><span class="o">);</span> + +<span class="n">kid</span><span class="o">.</span><span class="na">withKCLConfig</span><span class="o">(</span><span class="n">kclConfig</span><span class="o">);</span></code></pre></figure> <p>Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html">AWS console</a> and edit the offsets in your DynamoDB Table. By default, the table-name has the following format: “<job name>-<job id>-<kinesis stream>”.</p>
