Modified: flink/site/docs/0.6-incubating/example_connectors.html URL: http://svn.apache.org/viewvc/flink/site/docs/0.6-incubating/example_connectors.html?rev=1657551&r1=1657550&r2=1657551&view=diff ============================================================================== --- flink/site/docs/0.6-incubating/example_connectors.html (original) +++ flink/site/docs/0.6-incubating/example_connectors.html Thu Feb 5 12:21:38 2015 @@ -5,109 +5,131 @@ <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>Apache Flink (incubating): Example: Connectors</title> - <link rel="stylesheet" href="/css/bootstrap.css"> - <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css"> - <link rel="stylesheet" href="/css/syntax.css"> - <link rel="/css/custom.css"> - <link rel="css/codetabs.css"> - <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet"> + <link rel="shortcut icon" href="favicon.ico" type="image/x-icon"> + <link rel="icon" href="favicon.ico" type="image/x-icon"> + <link rel="stylesheet" href="css/bootstrap.css"> + <link rel="stylesheet" href="css/bootstrap-lumen-custom.css"> + <link rel="stylesheet" href="css/syntax.css"> + <link rel="stylesheet" href="css/custom.css"> + <link href="css/main/main.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js"></script> - <script src="/js/bootstrap.min.js"></script> + <script src="js/bootstrap.min.js"></script> <script src="js/codetabs.js"></script> </head> <body> <nav class="navbar navbar-default navbar-fixed-top" role="navigation"> <div class="container"> - <div class="navbar-header"> - <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> - <span class="sr-only">Toggle navigation</span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - </button> - <a class="navbar-brand" href="http://flink.incubator.apache.org">Apache Flink</a> - </div> - - <div class="collapse navbar-collapse" id="navbar-collapse-1"> - <ul class="nav navbar-nav"> + <div class="row"> + <div class="col-md-1 af-mobile-nav-bar"> + <a href="index.html" title="Home"> + <img class="hidden-xs hidden-sm img-responsive" + src="img/logo.png" alt="Apache Flink Logo"> + </a> + <div class="row visible-xs"> + <div class="col-xs-3"> + <a href="index.html" title="Home"> + <img class="hidden-x hidden-sm img-responsive" + src="img/logo.png" alt="Apache Flink Logo"> + </a> + </div> + <div class="col-xs-5"></div> + <div class="col-xs-4"> + <div class="af-mobile-btn"> + <span class="glyphicon glyphicon-plus"></span> + </div> + </div> + </div> + </div> + <!-- Navigation --> + <div class="col-md-11"> + <div class="collapse navbar-collapse" id="navbar-collapse-1"> + <ul class="nav navbar-nav"> + + <li> + <a href="index.html" class="">Documentation</a> + </li> + + <li> + <a href="api/java/index.html">Javadoc</a> + </li> + + <li> + <a href="api/scala/index.html#org.apache.flink.api.scala.package">Scaladoc</a> + </li> - <li> - <a href="index.html" class="">Documentation</a> - </li> - - <li> - <a href="api/java/index.html">Javadoc</a> - </li> - </ul> + </ul> + </div> + </div> </div> </div> </nav> - <div style="padding-top:70px" class="container"> + + <div style="padding-top:120px" class="container"> <div class="row"> <div class="col-md-3"> <ul> - <li>Quickstart - <ul> - <li><a href="setup_quickstart.html">Install</a></li> - <li><a href="run_example_quickstart.html">Run Example</a></li> - <li><a href="java_api_quickstart.html">Java API</a></li> - <li><a href="scala_api_quickstart.html">Scala API</a></li> - <li><a href="faq.html">FAQ</a></li> - </ul> - </li> - - <li>Setup & Configuration - <ul> - <li><a href="building.html">Build Instructions</a></li> - <li><a href="local_setup.html">Local Setup</a></li> - <li><a href="cluster_setup.html">Cluster Setup</a></li> - <li><a href="yarn_setup.html">YARN Setup</a></li> - <li><a href="config.html">Configuration</a></li> - </ul> - </li> - - <li>Programming Guides - <ul> - <li><a href="java_api_guide.html">Java API</a></li> - <li><a href="java_api_transformations.html">Java API Transformations</a></li> - <li><a href="scala_api_guide.html">Scala API</a></li> - <li><a href="iterations.html">Iterations</a></li> - <li><a href="spargel_guide.html">Spargel Graph API</a></li> - </ul> - </li> - - <li>Examples - <ul> - <li><a href="java_api_examples.html">Java API</a></li> - <li><a href="scala_api_examples.html">Scala API</a></li> - <li><a href="example_connectors.html">Connecting to other systems</a></li> - </ul> - </li> - - <li>Execution - <ul> - <li><a href="local_execution.html">Local/Debugging</a></li> - <li><a href="cluster_execution.html">Cluster</a></li> - <li><a href="cli.html">Command-Line Interface</a></li> - <li><a href="web_client.html">Web Interface</a></li> - </ul> - </li> - - <li>Internals - <ul> - <li><a href="internal_overview.html">Overview</a></li> - <li><a href="internal_general_arch.html">General Architecture</a></li> - <li><a href="internal_add_operator.html">How-to: Adding a new Operator</a></li> - </ul> - </li> + <li><a href="faq.html">FAQ</a></li> + <li>Quickstart + <ul> + <li><a href="setup_quickstart.html">Setup</a></li> + <li><a href="run_example_quickstart.html">Run Example</a></li> + <li><a href="java_api_quickstart.html">Java API</a></li> + <li><a href="scala_api_quickstart.html">Scala API</a></li> + </ul> + </li> + + <li>Setup & Configuration + <ul> + <li><a href="local_setup.html">Local Setup</a></li> + <li><a href="building.html">Build Flink</a></li> + <li><a href="cluster_setup.html">Cluster Setup</a></li> + <li><a href="yarn_setup.html">YARN Setup</a></li> + <li><a href="config.html">Configuration</a></li> + </ul> + </li> + + <li>Programming Guides + <ul> + <li><a href="programming_guide.html">Programming Guide</a></li> + <li><a href="dataset_transformations.html">DataSet Transformations</a></li> + <li><a href="java8_programming_guide.html">Java 8 Programming Guide</a></li> + <li><a href="streaming_guide.html">Streaming Guide</a></li> + <li><a href="iterations.html">Iterations</a></li> + <li><a href="spargel_guide.html">Spargel Graph API</a></li> + <li><a href="hadoop_compatibility.html">Hadoop Compatibility</a></li> + </ul> + </li> + + <li>Examples + <ul> + <li><a href="examples.html">Bundled Examples</a></li> + <li><a href="example_connectors.html">Connecting to other systems</a></li> + </ul> + </li> + + <li>Execution + <ul> + <li><a href="local_execution.html">Local/Debugging</a></li> + <li><a href="cluster_execution.html">Cluster</a></li> + <li><a href="cli.html">Command-Line Interface</a></li> + <li><a href="web_client.html">Web Interface</a></li> + </ul> + </li> + + <li>Internals + <ul> + <li><a href="internal_overview.html">Overview</a></li> + </ul> + </li> </ul> + </div> <div class="col-md-9"> <h1>Example: Connectors</h1> - + <p>Apache Flink allows users to access many different systems as data sources or sinks. The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept of so called <code>InputFormat</code>s and <code>OutputFormat</code>s.</p> <p>One implementation of these <code>InputFormat</code>s is the <code>HadoopInputFormat</code>. This is a wrapper that allows users to use all existing Hadoop input formats with Flink.</p> @@ -118,44 +140,52 @@ <p><em>Note: This example works starting from Flink 0.6-incubating</em></p> -<p>This example is using the <code>HadoopInputFormat</code> wrapper to use an existing Hadoop input format implementation for accessing <a href="https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/">Azure's Table Storage</a>.</p> +<p>This example is using the <code>HadoopInputFormat</code> wrapper to use an existing Hadoop input format implementation for accessing <a href="https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/">Azureâs Table Storage</a>.</p> <ol> -<li><p>Download and compile the <code>azure-tables-hadoop</code> project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. + <li> + <p>Download and compile the <code>azure-tables-hadoop</code> project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. Execute the following commands:</p> -<div class="highlight"><pre><code class="language-bash" data-lang="bash">git clone https://github.com/mooso/azure-tables-hadoop.git + + <div class="highlight"><pre><code class="language-bash">git clone https://github.com/mooso/azure-tables-hadoop.git <span class="nb">cd </span>azure-tables-hadoop -mvn clean install -</code></pre></div></li> -<li><p>Setup a new Flink project using the quickstarts:</p> -<div class="highlight"><pre><code class="language-bash" data-lang="bash">curl https://raw.githubusercontent.com/apache/incubator-flink/master/flink-quickstart/quickstart.sh <span class="p">|</span> bash -</code></pre></div></li> -<li><p>Set the the version of Flink to <code>0.6.1-hadoop2-incubating</code> in the <code>pom.xml</code> file. The quickstart.sh script sets the version to the <code>hadoop1</code> version of Flink. Since the <code>microsoft-hadoop-azure</code> has been written for Hadoop 2.2 (mapreduce-API) version, we need to use the appropriate Flink version. </p> - -<p>Replace all occurences of <code><version>0.6.1-incubating</version></code> with <code><version>0.6.1-hadoop2-incubating</version></code>.</p></li> -<li><p>Add the following dependencies (in the <code><dependencies></code> section) to your <code>pom.xml</code> file:</p> -<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><dependency></span> +mvn clean install</code></pre></div> + </li> + <li> + <p>Setup a new Flink project using the quickstarts:</p> + + <div class="highlight"><pre><code class="language-bash">curl https://raw.githubusercontent.com/apache/incubator-flink/master/flink-quickstart/quickstart.sh <span class="p">|</span> bash</code></pre></div> + </li> + <li> + <p>Set the the version of Flink to <code>0.7-hadoop2-incubating</code> in the <code>pom.xml</code> file. The quickstart.sh script sets the version to the <code>hadoop1</code> version of Flink. Since the <code>microsoft-hadoop-azure</code> has been written for Hadoop 2.2 (mapreduce-API) version, we need to use the appropriate Flink version. </p> + + <p>Replace all occurences of <code><version>0.7.0-incubating</version></code> with <code><version>0.7-hadoop2-incubating</version></code>.</p> + </li> + <li> + <p>Add the following dependencies (in the <code><dependencies></code> section) to your <code>pom.xml</code> file:</p> + + <div class="highlight"><pre><code class="language-xml"><span class="nt"><dependency></span> <span class="nt"><groupId></span>org.apache.flink<span class="nt"></groupId></span> <span class="nt"><artifactId></span>flink-hadoop-compatibility<span class="nt"></artifactId></span> - <span class="nt"><version></span>0.6.1-hadoop2-incubating<span class="nt"></version></span> + <span class="nt"><version></span>0.7-hadoop2-incubating<span class="nt"></version></span> <span class="nt"></dependency></span> <span class="nt"><dependency></span> <span class="nt"><groupId></span>com.microsoft.hadoop<span class="nt"></groupId></span> <span class="nt"><artifactId></span>microsoft-hadoop-azure<span class="nt"></artifactId></span> <span class="nt"><version></span>0.0.4<span class="nt"></version></span> -<span class="nt"></dependency></span> -</code></pre></div> -<ul> -<li><code>flink-hadoop-compatibility</code> is a Flink package that provides the Hadoop input format wrappers.</li> -<li><code>microsoft-hadoop-azure</code> is adding the project we've build before to our project.</li> -</ul></li> +<span class="nt"></dependency></span></code></pre></div> + + <p><code>flink-hadoop-compatibility</code> is a Flink package that provides the Hadoop input format wrappers. +<code>microsoft-hadoop-azure</code> is adding the project weâve build before to our project.</p> + </li> </ol> <p>The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!). Browse to the code of the <code>Job.java</code> file. Its an empty skeleton for a Flink job.</p> <p>Paste the following code into it:</p> -<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">java.util.Map</span><span class="o">;</span> + +<div class="highlight"><pre><code class="language-java"><span class="kn">import</span> <span class="nn">java.util.Map</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.flink.api.common.functions.MapFunction</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.flink.api.java.DataSet</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.flink.api.java.ExecutionEnvironment</span><span class="o">;</span> @@ -169,18 +199,21 @@ Browse to the code of the <code>Job.java <span class="kn">import</span> <span class="nn">com.microsoft.windowsazure.storage.table.EntityProperty</span><span class="o">;</span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">AzureTableExample</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> <span class="c1">// set up the execution environment</span> <span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> + <span class="c1">// create a AzureTableInputFormat, using a Hadoop input format wrapper</span> <span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">></span> <span class="n">hdIf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">>(</span><span class="k">new</span> <span class="nf">AzureTableInputFormat</span><span class="o">(),</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Job</span><span class="o">());</span> + <span class="c1">// set the Account URI, something like: https://apacheflink.table.core.windows.net</span> <span class="n">hdIf</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="n">AzureTableConfiguration</span><span class="o">.</span><span class="na">Keys</span><span class="o">.</span><span class="na">ACCOUNT_URI</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="s">"TODO"</span><span class="o">);</span> <span class="c1">// set the secret storage key here</span> <span class="n">hdIf</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="n">AzureTableConfiguration</span><span class="o">.</span><span class="na">Keys</span><span class="o">.</span><span class="na">STORAGE_KEY</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="s">"TODO"</span><span class="o">);</span> <span class="c1">// set the table name here</span> <span class="n">hdIf</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="n">AzureTableConfiguration</span><span class="o">.</span><span class="na">Keys</span><span class="o">.</span><span class="na">TABLE_NAME</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="s">"TODO"</span><span class="o">);</span> - + <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">hdIf</span><span class="o">);</span> <span class="c1">// a little example how to use the data in a mapper.</span> <span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">fin</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span><span class="n">WritableEntity</span><span class="o">>,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> @@ -188,20 +221,24 @@ Browse to the code of the <code>Job.java <span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">></span> <span class="n">arg0</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> <span class="n">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"--------------------------------\nKey = "</span><span class="o">+</span><span class="n">arg0</span><span class="o">.</span><span class="na">f0</span><span class="o">);</span> <span class="n">WritableEntity</span> <span class="n">we</span> <span class="o">=</span> <span class="n">arg0</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> + <span class="k">for</span><span class="o">(</span><span class="n">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">EntityProperty</span><span class="o">></span> <span class="n">prop</span> <span class="o">:</span> <span class="n">we</span><span class="o">.</span><span class="na">getProperties</span><span class="o">().</span><span class="na">entrySet</span><span class="o">())</span> <span class="o">{</span> <span class="n">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"key="</span><span class="o">+</span><span class="n">prop</span><span class="o">.</span><span class="na">getKey</span><span class="o">()</span> <span class="o">+</span> <span class="s">" ; value (asString)="</span><span class="o">+</span><span class="n">prop</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getValueAsString</span><span class="o">());</span> <span class="o">}</span> + <span class="k">return</span> <span class="n">arg0</span><span class="o">.</span><span class="na">f0</span><span class="o">.</span><span class="na">toString</span><span class="o">();</span> <span class="o">}</span> <span class="o">});</span> + <span class="c1">// emit result (this works only locally)</span> <span class="n">fin</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> + <span class="c1">// execute program</span> <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Azure Example"</span><span class="o">);</span> <span class="o">}</span> -<span class="o">}</span> -</code></pre></div> -<p>The example shows how to access an Azure table and turn data into Flink's <code>DataSet</code> (more specifically, the type of the set is <code>DataSet<Tuple2<Text, WritableEntity>></code>). With the <code>DataSet</code>, you can apply all known transformations to the DataSet.</p> +<span class="o">}</span></code></pre></div> + +<p>The example shows how to access an Azure table and turn data into Flinkâs <code>DataSet</code> (more specifically, the type of the set is <code>DataSet<Tuple2<Text, WritableEntity>></code>). With the <code>DataSet</code>, you can apply all known transformations to the DataSet.</p> <h2 id="access-mongodb">Access MongoDB</h2> @@ -210,6 +247,8 @@ Browse to the code of the <code>Job.java <p>Please see this (slightly outdated) blogpost on <a href="http://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html">How to access MongoDB with Apache Flink</a>.</p> + + <!-- Disqus Area --> <div style="padding-top:30px" id="disqus_thread"></div> <script type="text/javascript"> @@ -224,14 +263,12 @@ Browse to the code of the <code>Job.java })(); </script> <noscript>Please enable JavaScript to view the <a href="http://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript> - <a href="http://disqus.com" class="dsq-brlink">comments powered by <span class="logo-disqus">Disqus</span></a> - </div> </div> <div class="footer"> - <p><hr class="divider"></p> + <hr class="divider" /> <p><small>Apache Flink is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC. Incubation is @@ -242,9 +279,10 @@ incubation status is not necessarily a r stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.</small></p> -<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo"></a></p> +<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo" /></a></p> -<p class="text-center"><a href="/privacy-policy.html">Privacy Policy<a></p> +<p class="text-center"><a href="privacy-policy.html">Privacy Policy<a> +</a></a></p> </div> </div>
Modified: flink/site/docs/0.6-incubating/faq.html URL: http://svn.apache.org/viewvc/flink/site/docs/0.6-incubating/faq.html?rev=1657551&r1=1657550&r2=1657551&view=diff ============================================================================== --- flink/site/docs/0.6-incubating/faq.html (original) +++ flink/site/docs/0.6-incubating/faq.html Thu Feb 5 12:21:38 2015 @@ -5,123 +5,180 @@ <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>Apache Flink (incubating): Frequently Asked Questions (FAQ)</title> - <link rel="stylesheet" href="/css/bootstrap.css"> - <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css"> - <link rel="stylesheet" href="/css/syntax.css"> - <link rel="/css/custom.css"> - <link rel="css/codetabs.css"> - <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet"> + <link rel="shortcut icon" href="favicon.ico" type="image/x-icon"> + <link rel="icon" href="favicon.ico" type="image/x-icon"> + <link rel="stylesheet" href="css/bootstrap.css"> + <link rel="stylesheet" href="css/bootstrap-lumen-custom.css"> + <link rel="stylesheet" href="css/syntax.css"> + <link rel="stylesheet" href="css/custom.css"> + <link href="css/main/main.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js"></script> - <script src="/js/bootstrap.min.js"></script> + <script src="js/bootstrap.min.js"></script> <script src="js/codetabs.js"></script> </head> <body> <nav class="navbar navbar-default navbar-fixed-top" role="navigation"> <div class="container"> - <div class="navbar-header"> - <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> - <span class="sr-only">Toggle navigation</span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - </button> - <a class="navbar-brand" href="http://flink.incubator.apache.org">Apache Flink</a> - </div> - - <div class="collapse navbar-collapse" id="navbar-collapse-1"> - <ul class="nav navbar-nav"> + <div class="row"> + <div class="col-md-1 af-mobile-nav-bar"> + <a href="index.html" title="Home"> + <img class="hidden-xs hidden-sm img-responsive" + src="img/logo.png" alt="Apache Flink Logo"> + </a> + <div class="row visible-xs"> + <div class="col-xs-3"> + <a href="index.html" title="Home"> + <img class="hidden-x hidden-sm img-responsive" + src="img/logo.png" alt="Apache Flink Logo"> + </a> + </div> + <div class="col-xs-5"></div> + <div class="col-xs-4"> + <div class="af-mobile-btn"> + <span class="glyphicon glyphicon-plus"></span> + </div> + </div> + </div> + </div> + <!-- Navigation --> + <div class="col-md-11"> + <div class="collapse navbar-collapse" id="navbar-collapse-1"> + <ul class="nav navbar-nav"> + + <li> + <a href="index.html" class="">Documentation</a> + </li> + + <li> + <a href="api/java/index.html">Javadoc</a> + </li> + + <li> + <a href="api/scala/index.html#org.apache.flink.api.scala.package">Scaladoc</a> + </li> - <li> - <a href="index.html" class="">Documentation</a> - </li> - - <li> - <a href="api/java/index.html">Javadoc</a> - </li> - </ul> + </ul> + </div> + </div> </div> </div> </nav> - <div style="padding-top:70px" class="container"> + + <div style="padding-top:120px" class="container"> <div class="row"> <div class="col-md-3"> <ul> - <li>Quickstart - <ul> - <li><a href="setup_quickstart.html">Install</a></li> - <li><a href="run_example_quickstart.html">Run Example</a></li> - <li><a href="java_api_quickstart.html">Java API</a></li> - <li><a href="scala_api_quickstart.html">Scala API</a></li> - <li><a href="faq.html">FAQ</a></li> - </ul> - </li> - - <li>Setup & Configuration - <ul> - <li><a href="building.html">Build Instructions</a></li> - <li><a href="local_setup.html">Local Setup</a></li> - <li><a href="cluster_setup.html">Cluster Setup</a></li> - <li><a href="yarn_setup.html">YARN Setup</a></li> - <li><a href="config.html">Configuration</a></li> - </ul> - </li> - - <li>Programming Guides - <ul> - <li><a href="java_api_guide.html">Java API</a></li> - <li><a href="java_api_transformations.html">Java API Transformations</a></li> - <li><a href="scala_api_guide.html">Scala API</a></li> - <li><a href="iterations.html">Iterations</a></li> - <li><a href="spargel_guide.html">Spargel Graph API</a></li> - </ul> - </li> - - <li>Examples - <ul> - <li><a href="java_api_examples.html">Java API</a></li> - <li><a href="scala_api_examples.html">Scala API</a></li> - <li><a href="example_connectors.html">Connecting to other systems</a></li> - </ul> - </li> - - <li>Execution - <ul> - <li><a href="local_execution.html">Local/Debugging</a></li> - <li><a href="cluster_execution.html">Cluster</a></li> - <li><a href="cli.html">Command-Line Interface</a></li> - <li><a href="web_client.html">Web Interface</a></li> - </ul> - </li> - - <li>Internals - <ul> - <li><a href="internal_overview.html">Overview</a></li> - <li><a href="internal_general_arch.html">General Architecture</a></li> - <li><a href="internal_add_operator.html">How-to: Adding a new Operator</a></li> - </ul> - </li> + <li><a href="faq.html">FAQ</a></li> + <li>Quickstart + <ul> + <li><a href="setup_quickstart.html">Setup</a></li> + <li><a href="run_example_quickstart.html">Run Example</a></li> + <li><a href="java_api_quickstart.html">Java API</a></li> + <li><a href="scala_api_quickstart.html">Scala API</a></li> + </ul> + </li> + + <li>Setup & Configuration + <ul> + <li><a href="local_setup.html">Local Setup</a></li> + <li><a href="building.html">Build Flink</a></li> + <li><a href="cluster_setup.html">Cluster Setup</a></li> + <li><a href="yarn_setup.html">YARN Setup</a></li> + <li><a href="config.html">Configuration</a></li> + </ul> + </li> + + <li>Programming Guides + <ul> + <li><a href="programming_guide.html">Programming Guide</a></li> + <li><a href="dataset_transformations.html">DataSet Transformations</a></li> + <li><a href="java8_programming_guide.html">Java 8 Programming Guide</a></li> + <li><a href="streaming_guide.html">Streaming Guide</a></li> + <li><a href="iterations.html">Iterations</a></li> + <li><a href="spargel_guide.html">Spargel Graph API</a></li> + <li><a href="hadoop_compatibility.html">Hadoop Compatibility</a></li> + </ul> + </li> + + <li>Examples + <ul> + <li><a href="examples.html">Bundled Examples</a></li> + <li><a href="example_connectors.html">Connecting to other systems</a></li> + </ul> + </li> + + <li>Execution + <ul> + <li><a href="local_execution.html">Local/Debugging</a></li> + <li><a href="cluster_execution.html">Cluster</a></li> + <li><a href="cli.html">Command-Line Interface</a></li> + <li><a href="web_client.html">Web Interface</a></li> + </ul> + </li> + + <li>Internals + <ul> + <li><a href="internal_overview.html">Overview</a></li> + </ul> + </li> </ul> + </div> <div class="col-md-9"> <h1>Frequently Asked Questions (FAQ)</h1> + + <ul id="markdown-toc"> + <li><a href="#general">General</a> <ul> + <li><a href="#is-flink-a-hadoop-project">Is Flink a Hadoop Project?</a></li> + <li><a href="#do-i-have-to-install-apache-hadoop-to-use-flink">Do I have to install Apache Hadoop to use Flink?</a></li> + </ul> + </li> + <li><a href="#usage">Usage</a> <ul> + <li><a href="#how-do-i-assess-the-progress-of-a-flink-program">How do I assess the progress of a Flink program?</a></li> + <li><a href="#how-can-i-figure-out-why-a-program-failed">How can I figure out why a program failed?</a></li> + <li><a href="#how-do-i-debug-flink-programs">How do I debug Flink programs?</a></li> + </ul> + </li> + <li><a href="#errors">Errors</a> <ul> + <li><a href="#why-am-i-getting-a-nonserializableexception-">Why am I getting a âNonSerializableExceptionâ ?</a></li> + <li><a href="#i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this">I get an error message saying that not enough buffers are available. How do I fix this?</a></li> + <li><a href="#my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause">My job fails early with a java.io.EOFException. What could be the cause?</a></li> + <li><a href="#my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types">My program does not compute the correct result. Why are my custom key types</a></li> + <li><a href="#i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong">I get a java.lang.InstantiationException for my data type, what is wrong?</a></li> + <li><a href="#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do">I canât stop Flink with the provided stop-scripts. What can I do?</a></li> + <li><a href="#i-got-an-outofmemoryexception-what-can-i-do">I got an OutOfMemoryException. What can I do?</a></li> + <li><a href="#why-do-the-taskmanager-log-files-become-so-huge">Why do the TaskManager log files become so huge?</a></li> + </ul> + </li> + <li><a href="#yarn-deployment">YARN Deployment</a> <ul> + <li><a href="#the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</a></li> + <li><a href="#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</a></li> + </ul> + </li> + <li><a href="#features">Features</a> <ul> + <li><a href="#what-kind-of-fault-tolerance-does-flink-provide">What kind of fault-tolerance does Flink provide?</a></li> + <li><a href="#are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</a></li> + </ul> + </li> +</ul> - <h1 id="general">General</h1> +<h2 id="general">General</h2> -<h2 id="is-flink-a-hadoop-project?">Is Flink a Hadoop Project?</h2> +<h3 id="is-flink-a-hadoop-project">Is Flink a Hadoop Project?</h3> -<p>Flink is a data processing system and an alternative to Hadoop's +<p>Flink is a data processing system and an alternative to Hadoopâs MapReduce component. It comes with its own runtime, rather than building on top of MapReduce. As such, it can work completely independently of the Hadoop -ecosystem. However, Flink can also access Hadoop's distributed file -system (HDFS) to read and write data, and Hadoop's next-generation resource +ecosystem. However, Flink can also access Hadoopâs distributed file +system (HDFS) to read and write data, and Hadoopâs next-generation resource manager (YARN) to provision cluster resources. Since most Flink users are using Hadoop HDFS to store their data, we ship already the required libraries to access HDFS.</p> -<h2 id="do-i-have-to-install-apache-hadoop-to-use-flink?">Do I have to install Apache Hadoop to use Flink?</h2> +<h3 id="do-i-have-to-install-apache-hadoop-to-use-flink">Do I have to install Apache Hadoop to use Flink?</h3> <p>No. Flink can run without a Hadoop installation. However, a very common setup is to use Flink to analyze data stored in the Hadoop Distributed @@ -131,54 +188,71 @@ Hadoop client libraries with Flink by de <p>Additionally, we provide a special YARN Enabled download of Flink for users with an existing Hadoop YARN cluster. <a href="http://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html">Apache Hadoop YARN</a> -is Hadoop's cluster resource manager that allows to use +is Hadoopâs cluster resource manager that allows to use different execution engines next to each other on a cluster.</p> -<h1 id="usage">Usage</h1> +<h2 id="usage">Usage</h2> -<h2 id="how-do-i-assess-the-progress-of-a-flink-program?">How do I assess the progress of a Flink program?</h2> +<h3 id="how-do-i-assess-the-progress-of-a-flink-program">How do I assess the progress of a Flink program?</h3> <p>There are a multiple of ways to track the progress of a Flink program:</p> <ul> -<li>The JobManager (the master of the distributed system) starts a web interface + <li>The JobManager (the master of the distributed system) starts a web interface to observe program execution. In runs on port 8081 by default (configured in <code>conf/flink-config.yml</code>).</li> -<li>When you start a program from the command line, it will print the status + <li>When you start a program from the command line, it will print the status changes of all operators as the program progresses through the operations.</li> -<li>All status changes are also logged to the JobManager's log file.</li> + <li>All status changes are also logged to the JobManagerâs log file.</li> </ul> -<h2 id="how-can-i-figure-out-why-a-program-failed?">How can I figure out why a program failed?</h2> +<h3 id="how-can-i-figure-out-why-a-program-failed">How can I figure out why a program failed?</h3> <ul> -<li>Thw JobManager web frontend (by default on port 8081) displays the exceptions + <li>The JobManager web frontend (by default on port 8081) displays the exceptions of failed tasks.</li> -<li>If you run the program from the command-line, task exceptions are printed to + <li>If you run the program from the command-line, task exceptions are printed to the standard error stream and shown on the console.</li> -<li>Both the command line and the web interface allow you to figure out which + <li>Both the command line and the web interface allow you to figure out which parallel task first failed and caused the other tasks to cancel the execution.</li> -<li>Failing tasks and the corresponding exceptions are reported in the log files + <li>Failing tasks and the corresponding exceptions are reported in the log files of the master and the worker where the exception occurred (<code>log/flink-<user>-jobmanager-<host>.log</code> and <code>log/flink-<user>-taskmanager-<host>.log</code>).</li> </ul> -<h2 id="how-do-i-debug-flink-programs?">How do I debug Flink programs?</h2> +<h3 id="how-do-i-debug-flink-programs">How do I debug Flink programs?</h3> <ul> -<li>When you start a program locally with the <a href="local_execution.html">LocalExecutor</a>, + <li>When you start a program locally with the <a href="local_execution.html">LocalExecutor</a>, you can place breakpoints in your functions and debug them like normal Java/Scala programs.</li> -<li>The <a href="java_api_guide.html#accumulators">Accumulators</a> are very helpful in + <li>The <a href="java_api_guide.html#accumulators">Accumulators</a> are very helpful in tracking the behavior of the parallel execution. They allow you to gather -information inside the program's operations and show them after the program +information inside the programâs operations and show them after the program execution.</li> </ul> -<h1 id="errors">Errors</h1> +<h2 id="errors">Errors</h2> + +<h3 id="why-am-i-getting-a-nonserializableexception-">Why am I getting a âNonSerializableExceptionâ ?</h3> + +<p>All functions in Flink must be serializable, as defined by <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">java.io.Serializable</a>. +Since all function interfaces are serializable, the exception means that one +of the fields used in your function is not serializable.</p> + +<p>In particular, if your function is an inner class, or anonymous inner class, +it contains a hidden reference to the enclosing class (usually called <code>this$0</code>, if you look +at the function in the debugger). If the enclosing class is not serializable, this is probably +the source of the error. Solutions are to</p> + +<ul> + <li>make the function a standalone class, or a static inner class (no more reference to the enclosing class)</li> + <li>make the enclosing class serializable</li> + <li>use a Java 8 lambda function.</li> +</ul> -<h2 id="i-get-an-error-message-saying-that-not-enough-buffers-are-available.-how-do-i-fix-this?">I get an error message saying that not enough buffers are available. How do I fix this?</h2> +<h3 id="i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this">I get an error message saying that not enough buffers are available. How do I fix this?</h3> <p>If you run Flink in a massively parallel setting (100+ parallel threads), you need to adapt the number of network buffers via the config parameter @@ -187,7 +261,7 @@ As a rule-of-thumb, the number of buffer <code>4 * numberOfNodes * numberOfTasksPerNode^2</code>. See <a href="config.html">Configuration Reference</a> for details.</p> -<h2 id="my-job-fails-early-with-a-java.io.eofexception.-what-could-be-the-cause?">My job fails early with a java.io.EOFException. What could be the cause?</h2> +<h3 id="my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause">My job fails early with a java.io.EOFException. What could be the cause?</h3> <p>Note: In version <em>0.4</em>, the delta iterations limit the solution set to records with fixed-length data types. We will in the next version.</p> @@ -196,7 +270,8 @@ records with fixed-length data types. We wrong HDFS version. Because different HDFS versions are often not compatible with each other, the connection between the filesystem master and the client breaks.</p> -<div class="highlight"><pre><code class="language-bash" data-lang="bash">Call to <host:port> failed on <span class="nb">local </span>exception: java.io.EOFException + +<div class="highlight"><pre><code class="language-bash">Call to <host:port> failed on <span class="nb">local </span>exception: java.io.EOFException at org.apache.hadoop.ipc.Client.wrapException<span class="o">(</span>Client.java:775<span class="o">)</span> at org.apache.hadoop.ipc.Client.call<span class="o">(</span>Client.java:743<span class="o">)</span> at org.apache.hadoop.ipc.RPC<span class="nv">$Invoker</span>.invoke<span class="o">(</span>RPC.java:220<span class="o">)</span> @@ -206,14 +281,13 @@ breaks.</p> at org.apache.hadoop.hdfs.DFSClient.<init><span class="o">(</span>DFSClient.java:207<span class="o">)</span> at org.apache.hadoop.hdfs.DFSClient.<init><span class="o">(</span>DFSClient.java:170<span class="o">)</span> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:82<span class="o">)</span> - at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:276 -</code></pre></div> + at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:276</code></pre></div> + <p>Please refer to the <a href="/downloads.html#maven">download page</a> and -the <a href=https://github.com/apache/incubator-flink/blob/master/README.md>build instructions</a> +the <a href="https://github.com/apache/incubator-flink/blob/master/README.md">build instructions</a> for details on how to set up Flink for different Hadoop and HDFS versions.</p> -<h2 id="my-program-does-not-compute-the-correct-result.-why-are-my-custom-key-types">My program does not compute the correct result. Why are my custom key types</h2> - +<h3 id="my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types">My program does not compute the correct result. Why are my custom key types</h3> <p>are not grouped/joined correctly?</p> <p>Keys must correctly implement the methods <code>java.lang.Object#hashCode()</code>, @@ -221,14 +295,14 @@ for details on how to set up Flink for d These methods are always backed with default implementations which are usually inadequate. Therefore, all keys must override <code>hashCode()</code> and <code>equals(Object o)</code>.</p> -<h2 id="i-get-a-java.lang.instantiationexception-for-my-data-type,-what-is-wrong?">I get a java.lang.InstantiationException for my data type, what is wrong?</h2> +<h3 id="i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong">I get a java.lang.InstantiationException for my data type, what is wrong?</h3> <p>All data type classes must be public and have a public nullary constructor (constructor with no arguments). Further more, the classes must not be abstract or interfaces. If the classes are internal classes, they must be public and static.</p> -<h2 id="i-can't-stop-flink-with-the-provided-stop-scripts.-what-can-i-do?">I can't stop Flink with the provided stop-scripts. What can I do?</h2> +<h3 id="i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do">I canât stop Flink with the provided stop-scripts. What can I do?</h3> <p>Stopping the processes sometimes takes a few seconds, because the shutdown may do some cleanup work.</p> @@ -238,17 +312,17 @@ stopped with the provided stop-scripts ( cluster.sh</code>). You can kill their processes on Linux/Mac as follows:</p> <ul> -<li>Determine the process id (pid) of the JobManager / TaskManager process. You + <li>Determine the process id (pid) of the JobManager / TaskManager process. You can use the <code>jps</code> command on Linux(if you have OpenJDK installed) or command <code>ps -ef | grep java</code> to find all Java processes. </li> -<li>Kill the process with <code>kill -9 <pid></code>, where <code>pid</code> is the process id of the + <li>Kill the process with <code>kill -9 <pid></code>, where <code>pid</code> is the process id of the affected JobManager or TaskManager process.</li> </ul> <p>On Windows, the TaskManager shows a table of all processes and allows you to destroy a process by right its entry.</p> -<h2 id="i-got-an-outofmemoryexception.-what-can-i-do?">I got an OutOfMemoryException. What can I do?</h2> +<h3 id="i-got-an-outofmemoryexception-what-can-i-do">I got an OutOfMemoryException. What can I do?</h3> <p>These exceptions occur usually when the functions in the program consume a lot of memory by collection large numbers of objects, for example in lists or maps. @@ -260,9 +334,12 @@ provided.</p> <p>There are two ways to go about this:</p> <ol> -<li><p>See whether you can use less memory inside the functions. For example, use -arrays of primitive types instead of object types.</p></li> -<li><p>Reduce the memory that Flink reserves for its own processing. The + <li> + <p>See whether you can use less memory inside the functions. For example, use +arrays of primitive types instead of object types.</p> + </li> + <li> + <p>Reduce the memory that Flink reserves for its own processing. The TaskManager reserves a certain portion of the available memory for sorting, hashing, caching, network buffering, etc. That part of the memory is unavailable to the user-defined functions. By reserving it, the system can guarantee to not @@ -272,23 +349,25 @@ destage operations to disk, if necessary the user-defined functions, you can reduce that value using the configuration entries <code>taskmanager.memory.fraction</code> or <code>taskmanager.memory.size</code>. See the <a href="config.html">Configuration Reference</a> for details. This will leave more memory to JVM heap, -but may cause data processing tasks to go to disk more often.</p></li> +but may cause data processing tasks to go to disk more often.</p> + </li> </ol> -<h2 id="why-do-the-taskmanager-log-files-become-so-huge?">Why do the TaskManager log files become so huge?</h2> +<h3 id="why-do-the-taskmanager-log-files-become-so-huge">Why do the TaskManager log files become so huge?</h3> <p>Check the logging behavior of your jobs. Emitting logging per or tuple may be helpful to debug jobs in small setups with tiny data sets, it becomes very inefficient and disk space consuming if used for large input data.</p> -<h1 id="yarn-deployment">YARN Deployment</h1> +<h2 id="yarn-deployment">YARN Deployment</h2> -<h2 id="the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</h2> +<h3 id="the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</h3> <p>The <code>./bin/yarn-session.sh</code> script is intended to run while the YARN-session is open. In some error cases however, the script immediately stops running. The output looks like this:</p> -<div class="highlight"><pre><code class="language-text" data-lang="text">07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host + +<div class="highlight"><pre><code>07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host Flink JobManager is now running on worker1:6123 JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/ 07:34:51,528 INFO org.apache.flinkyarn.Client - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553 @@ -297,29 +376,37 @@ JobManager Web Interface: http://jobtrac 07:34:51,534 INFO org.apache.flinkyarn.Client - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123 07:34:51,559 INFO org.apache.flinkyarn.Client - YARN Client is shutting down </code></pre></div> + <p>The problem here is that the Application Master (AM) is stopping and the YARN client assumes that the application has finished.</p> <p>There are three possible reasons for that behavior:</p> <ul> -<li><p>The ApplicationMaster exited with an exception. To debug that error, have a + <li> + <p>The ApplicationMaster exited with an exception. To debug that error, have a look in the logfiles of the container. The <code>yarn-site.xml</code> file contains the configured path. The key for the path is <code>yarn.nodemanager.log-dirs</code>, the -default value is <code>${yarn.log.dir}/userlogs</code>.</p></li> -<li><p>YARN has killed the container that runs the ApplicationMaster. This case -happens when the AM used too much memory or other resources beyond YARN's -limits. In this case, you'll find error messages in the nodemanager logs on -the host.</p></li> -<li><p>The operating system has shut down the JVM of the AM. This can happen if the +default value is <code>${yarn.log.dir}/userlogs</code>.</p> + </li> + <li> + <p>YARN has killed the container that runs the ApplicationMaster. This case +happens when the AM used too much memory or other resources beyond YARNâs +limits. In this case, youâll find error messages in the nodemanager logs on +the host.</p> + </li> + <li> + <p>The operating system has shut down the JVM of the AM. This can happen if the YARN configuration is wrong and more memory than physically available is configured. Execute <code>dmesg</code> on the machine where the AM was running to see if -this happened. You see messages from Linux' <a href="http://linux-mm.org/OOM_Killer">OOM killer</a>.</p></li> +this happened. You see messages from Linuxâ <a href="http://linux-mm.org/OOM_Killer">OOM killer</a>.</p> + </li> </ul> -<h2 id="the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</h2> +<h3 id="the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</h3> <p>While starting the YARN session, you are receiving an exception like this:</p> -<div class="highlight"><pre><code class="language-text" data-lang="text">Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x + +<div class="highlight"><pre><code>Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158) @@ -367,6 +454,7 @@ this happened. You see messages from Lin at org.apache.flinkyarn.Client.run(Client.java:362) at org.apache.flinkyarn.Client.main(Client.java:568) </code></pre></div> + <p>The reason for this error is, that the home directory of the user <strong>in HDFS</strong> has the wrong permissions. The user (in this case <code>robert</code>) can not create directories in his own home directory.</p> @@ -374,23 +462,24 @@ directories in his own home directory.</ <p>Flink creates a <code>.flink/</code> directory in the users home directory where it stores the Flink jar and configuration file.</p> -<h1 id="features">Features</h1> +<h2 id="features">Features</h2> -<h2 id="what-kind-of-fault-tolerance-does-flink-provide?">What kind of fault-tolerance does Flink provide?</h2> +<h3 id="what-kind-of-fault-tolerance-does-flink-provide">What kind of fault-tolerance does Flink provide?</h3> <p>Flink can restart failed jobs. Mid-query fault tolerance will go into the open source project in the next versions.</p> -<h2 id="are-hadoop-like-utilities,-such-as-counters-and-the-distributedcache-supported?">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</h2> +<h3 id="are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</h3> -<p><a href="java_api_guide.html#accumulators-&-counters">Flink's Accumulators</a> work very similar like -[Hadoop's counters, but are more powerful.</p> +<p><a href="java_api_guide.html#accumulators-&-counters">Flinkâs Accumulators</a> work very similar like +[Hadoopâs counters, but are more powerful.</p> -<p>Flink has a <a href=https://github.com/apache/incubator-flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java>Distributed Cache</a> that is deeply integrated with the APIs. Please refer to the <a href=https://github.com/apache/incubator-flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L561>JavaDocs</a> for details on how to use it.</p> +<p>Flink has a <a href="https://github.com/apache/incubator-flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java">Distributed Cache</a> that is deeply integrated with the APIs. Please refer to the <a href="https://github.com/apache/incubator-flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L561">JavaDocs</a> for details on how to use it.</p> <p>In order to make data sets available on all tasks, we encourage you to use <a href="java_api_guide.html#broadcast_variables">Broadcast Variables</a> instead. They are more efficient and easier to use than the distributed cache.</p> - + + <!-- Disqus Area --> <div style="padding-top:30px" id="disqus_thread"></div> <script type="text/javascript"> @@ -405,14 +494,12 @@ open source project in the next versions })(); </script> <noscript>Please enable JavaScript to view the <a href="http://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript> - <a href="http://disqus.com" class="dsq-brlink">comments powered by <span class="logo-disqus">Disqus</span></a> - </div> </div> <div class="footer"> - <p><hr class="divider"></p> + <hr class="divider" /> <p><small>Apache Flink is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC. Incubation is @@ -423,9 +510,10 @@ incubation status is not necessarily a r stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.</small></p> -<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo"></a></p> +<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo" /></a></p> -<p class="text-center"><a href="/privacy-policy.html">Privacy Policy<a></p> +<p class="text-center"><a href="privacy-policy.html">Privacy Policy<a> +</a></a></p> </div> </div> Modified: flink/site/docs/0.6-incubating/hadoop_compatibility.html URL: http://svn.apache.org/viewvc/flink/site/docs/0.6-incubating/hadoop_compatibility.html?rev=1657551&r1=1657550&r2=1657551&view=diff ============================================================================== --- flink/site/docs/0.6-incubating/hadoop_compatibility.html (original) +++ flink/site/docs/0.6-incubating/hadoop_compatibility.html Thu Feb 5 12:21:38 2015 @@ -4,113 +4,317 @@ <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> - <title>Apache Flink (incubating): Hadoop Compatability</title> - <link rel="stylesheet" href="/css/bootstrap.css"> - <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css"> - <link rel="stylesheet" href="/css/syntax.css"> - <link rel="/css/custom.css"> - <link rel="css/codetabs.css"> - <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet"> + <title>Apache Flink (incubating): Hadoop Compatibility</title> + <link rel="shortcut icon" href="favicon.ico" type="image/x-icon"> + <link rel="icon" href="favicon.ico" type="image/x-icon"> + <link rel="stylesheet" href="css/bootstrap.css"> + <link rel="stylesheet" href="css/bootstrap-lumen-custom.css"> + <link rel="stylesheet" href="css/syntax.css"> + <link rel="stylesheet" href="css/custom.css"> + <link href="css/main/main.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js"></script> - <script src="/js/bootstrap.min.js"></script> + <script src="js/bootstrap.min.js"></script> <script src="js/codetabs.js"></script> </head> <body> <nav class="navbar navbar-default navbar-fixed-top" role="navigation"> <div class="container"> - <div class="navbar-header"> - <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> - <span class="sr-only">Toggle navigation</span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - <span class="icon-bar"></span> - </button> - <a class="navbar-brand" href="http://flink.incubator.apache.org">Apache Flink</a> - </div> - - <div class="collapse navbar-collapse" id="navbar-collapse-1"> - <ul class="nav navbar-nav"> + <div class="row"> + <div class="col-md-1 af-mobile-nav-bar"> + <a href="index.html" title="Home"> + <img class="hidden-xs hidden-sm img-responsive" + src="img/logo.png" alt="Apache Flink Logo"> + </a> + <div class="row visible-xs"> + <div class="col-xs-3"> + <a href="index.html" title="Home"> + <img class="hidden-x hidden-sm img-responsive" + src="img/logo.png" alt="Apache Flink Logo"> + </a> + </div> + <div class="col-xs-5"></div> + <div class="col-xs-4"> + <div class="af-mobile-btn"> + <span class="glyphicon glyphicon-plus"></span> + </div> + </div> + </div> + </div> + <!-- Navigation --> + <div class="col-md-11"> + <div class="collapse navbar-collapse" id="navbar-collapse-1"> + <ul class="nav navbar-nav"> + + <li> + <a href="index.html" class="">Documentation</a> + </li> + + <li> + <a href="api/java/index.html">Javadoc</a> + </li> + + <li> + <a href="api/scala/index.html#org.apache.flink.api.scala.package">Scaladoc</a> + </li> - <li> - <a href="index.html" class="">Documentation</a> - </li> - - <li> - <a href="api/java/index.html">Javadoc</a> - </li> - </ul> + </ul> + </div> + </div> </div> </div> </nav> - <div style="padding-top:70px" class="container"> + + <div style="padding-top:120px" class="container"> <div class="row"> <div class="col-md-3"> <ul> - <li>Quickstart - <ul> - <li><a href="setup_quickstart.html">Install</a></li> - <li><a href="run_example_quickstart.html">Run Example</a></li> - <li><a href="java_api_quickstart.html">Java API</a></li> - <li><a href="scala_api_quickstart.html">Scala API</a></li> - <li><a href="faq.html">FAQ</a></li> - </ul> - </li> - - <li>Setup & Configuration - <ul> - <li><a href="building.html">Build Instructions</a></li> - <li><a href="local_setup.html">Local Setup</a></li> - <li><a href="cluster_setup.html">Cluster Setup</a></li> - <li><a href="yarn_setup.html">YARN Setup</a></li> - <li><a href="config.html">Configuration</a></li> - </ul> - </li> - - <li>Programming Guides - <ul> - <li><a href="java_api_guide.html">Java API</a></li> - <li><a href="java_api_transformations.html">Java API Transformations</a></li> - <li><a href="scala_api_guide.html">Scala API</a></li> - <li><a href="iterations.html">Iterations</a></li> - <li><a href="spargel_guide.html">Spargel Graph API</a></li> - </ul> - </li> - - <li>Examples - <ul> - <li><a href="java_api_examples.html">Java API</a></li> - <li><a href="scala_api_examples.html">Scala API</a></li> - <li><a href="example_connectors.html">Connecting to other systems</a></li> - </ul> - </li> - - <li>Execution - <ul> - <li><a href="local_execution.html">Local/Debugging</a></li> - <li><a href="cluster_execution.html">Cluster</a></li> - <li><a href="cli.html">Command-Line Interface</a></li> - <li><a href="web_client.html">Web Interface</a></li> - </ul> - </li> - - <li>Internals - <ul> - <li><a href="internal_overview.html">Overview</a></li> - <li><a href="internal_general_arch.html">General Architecture</a></li> - <li><a href="internal_add_operator.html">How-to: Adding a new Operator</a></li> - </ul> - </li> + <li><a href="faq.html">FAQ</a></li> + <li>Quickstart + <ul> + <li><a href="setup_quickstart.html">Setup</a></li> + <li><a href="run_example_quickstart.html">Run Example</a></li> + <li><a href="java_api_quickstart.html">Java API</a></li> + <li><a href="scala_api_quickstart.html">Scala API</a></li> + </ul> + </li> + + <li>Setup & Configuration + <ul> + <li><a href="local_setup.html">Local Setup</a></li> + <li><a href="building.html">Build Flink</a></li> + <li><a href="cluster_setup.html">Cluster Setup</a></li> + <li><a href="yarn_setup.html">YARN Setup</a></li> + <li><a href="config.html">Configuration</a></li> + </ul> + </li> + + <li>Programming Guides + <ul> + <li><a href="programming_guide.html">Programming Guide</a></li> + <li><a href="dataset_transformations.html">DataSet Transformations</a></li> + <li><a href="java8_programming_guide.html">Java 8 Programming Guide</a></li> + <li><a href="streaming_guide.html">Streaming Guide</a></li> + <li><a href="iterations.html">Iterations</a></li> + <li><a href="spargel_guide.html">Spargel Graph API</a></li> + <li><a href="hadoop_compatibility.html">Hadoop Compatibility</a></li> + </ul> + </li> + + <li>Examples + <ul> + <li><a href="examples.html">Bundled Examples</a></li> + <li><a href="example_connectors.html">Connecting to other systems</a></li> + </ul> + </li> + + <li>Execution + <ul> + <li><a href="local_execution.html">Local/Debugging</a></li> + <li><a href="cluster_execution.html">Cluster</a></li> + <li><a href="cli.html">Command-Line Interface</a></li> + <li><a href="web_client.html">Web Interface</a></li> + </ul> + </li> + + <li>Internals + <ul> + <li><a href="internal_overview.html">Overview</a></li> + </ul> + </li> </ul> + </div> <div class="col-md-9"> - <h1>Hadoop Compatability</h1> + <h1>Hadoop Compatibility</h1> + + <ul id="markdown-toc"> + <li><a href="#project-configuration">Project Configuration</a></li> + <li><a href="#using-hadoop-data-types">Using Hadoop Data Types</a></li> + <li><a href="#using-hadoop-inputformats">Using Hadoop InputFormats</a></li> + <li><a href="#using-hadoop-outputformats">Using Hadoop OutputFormats</a></li> + <li><a href="#using-hadoop-mappers-and-reducers">Using Hadoop Mappers and Reducers</a></li> + <li><a href="#complete-hadoop-wordcount-example">Complete Hadoop WordCount Example</a></li> +</ul> + +<p>Flink is compatible with many Apache Hadoopâs MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce.</p> + +<p>You can:</p> + +<ul> + <li>use Hadoopâs <code>Writable</code> <a href="programming_guide.html#data-types">data types</a> in Flink programs.</li> + <li>use any Hadoop <code>InputFormat</code> as a <a href="programming_guide.html#data-sources">DataSource</a>.</li> + <li>use any Hadoop <code>OutputFormat</code> as a <a href="programming_guide.html#data-sinks">DataSink</a>.</li> + <li>use a Hadoop <code>Mapper</code> as <a href="dataset_transformations.html#flatmap">FlatMapFunction</a>.</li> + <li>use a Hadoop <code>Reducer</code> as <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunction</a>.</li> +</ul> + +<p>This document shows how to use existing Hadoop MapReduce code with Flink.</p> + +<h3 id="project-configuration">Project Configuration</h3> + +<p>The Hadoop Compatibility Layer is part of the <code>flink-addons</code> Maven module. All relevant classes are located in the <code>org.apache.flink.hadoopcompatibility</code> package. It includes separate packages and classes for the Hadoop <code>mapred</code> and <code>mapreduce</code> APIs.</p> + +<p>Add the following dependency to your <code>pom.xml</code> to use the Hadoop Compatibility Layer.</p> + +<div class="highlight"><pre><code class="language-xml"><span class="nt"><dependency></span> + <span class="nt"><groupId></span>org.apache.flink<span class="nt"></groupId></span> + <span class="nt"><artifactId></span>flink-hadoop-compatibility<span class="nt"></artifactId></span> + <span class="nt"><version></span>0.7.0-incubating<span class="nt"></version></span> +<span class="nt"></dependency></span></code></pre></div> + +<h3 id="using-hadoop-data-types">Using Hadoop Data Types</h3> + +<p>Flink supports all Hadoop <code>Writable</code> and <code>WritableComparable</code> data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the <a href="programming_guide.html#data-types">Programming Guide</a> for more details.</p> - <p>To be written.</p> +<h3 id="using-hadoop-inputformats">Using Hadoop InputFormats</h3> +<p>Flink provides a compatibility wrapper for Hadoop <code>InputFormats</code>. Any class that implements <code>org.apache.hadoop.mapred.InputFormat</code> or extends <code>org.apache.hadoop.mapreduce.InputFormat</code> is supported. Thus, Flink can handle Hadoop built-in formats such as <code>TextInputFormat</code> as well as external formats such as Hiveâs <code>HCatInputFormat</code>. Data read from Hadoop InputFormats is converted into a <code>DataSet<Tuple2<KEY,VALUE>></code> where <code>KEY</code> is the key and <code>VALUE</code> is the value of the original Hadoop key-value pair.</p> +<p>Flinkâs InputFormat wrappers are </p> + +<ul> + <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat</code> and </li> + <li><code>org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat</code></li> +</ul> + +<p>and can be used as regular Flink <a href="programming_guide.html#data-sources">InputFormats</a>.</p> + +<p>The following example shows how to use Hadoopâs <code>TextInputFormat</code>.</p> + +<div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> + +<span class="c1">// Set up the Hadoop TextInputFormat.</span> +<span class="n">Job</span> <span class="n">job</span> <span class="o">=</span> <span class="n">Job</span><span class="o">.</span><span class="na">getInstance</span><span class="o">();</span> +<span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">></span> <span class="n">hadoopIF</span> <span class="o">=</span> + <span class="c1">// create the Flink wrapper.</span> + <span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>(</span> + <span class="c1">// create the Hadoop InputFormat, specify key and value type, and job.</span> + <span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">job</span> + <span class="o">);</span> +<span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">inputPath</span><span class="o">));</span> + +<span class="c1">// Read data using the Hadoop TextInputFormat.</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>></span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">hadoopIF</span><span class="o">);</span> + +<span class="c1">// Do something with the data.</span> +<span class="o">[...]</span></code></pre></div> + +<h3 id="using-hadoop-outputformats">Using Hadoop OutputFormats</h3> + +<p>Flink provides a compatibility wrapper for Hadoop <code>OutputFormats</code>. Any class that implements <code>org.apache.hadoop.mapred.OutputFormat</code> or extends <code>org.apache.hadoop.mapreduce.OutputFormat</code> is supported. The OutputFormat wrapper expects its input data to be a <code>DataSet<Tuple2<KEY,VALUE>></code> where <code>KEY</code> is the key and <code>VALUE</code> is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat.</p> + +<p>Flinkâs OUtputFormat wrappers are</p> + +<ul> + <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat</code> and </li> + <li><code>org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat</code></li> +</ul> + +<p>and can be used as regular Flink <a href="programming_guide.html#data-sinks">OutputFormats</a>.</p> + +<p>The following example shows how to use Hadoopâs <code>TextOutputFormat</code>.</p> + +<div class="highlight"><pre><code class="language-java"><span class="c1">// Obtain your result to emit.</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>></span> <span class="n">hadoopResult</span> <span class="o">=</span> <span class="o">[...]</span> + +<span class="c1">// Set up the Hadoop TextOutputFormat.</span> +<span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">></span> <span class="n">hadoopOF</span> <span class="o">=</span> + <span class="c1">// create the Flink wrapper.</span> + <span class="k">new</span> <span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(</span> + <span class="c1">// set the Hadoop OutputFormat and specify the job.</span> + <span class="k">new</span> <span class="n">TextOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(),</span> <span class="n">job</span> + <span class="o">);</span> +<span class="n">hadoopOF</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">"mapreduce.output.textoutputformat.separator"</span><span class="o">,</span> <span class="s">" "</span><span class="o">);</span> +<span class="n">TextOutputFormat</span><span class="o">.</span><span class="na">setOutputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">outputPath</span><span class="o">));</span> + +<span class="c1">// Emit data using the Hadoop TextOutputFormat.</span> +<span class="n">result</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">)</span> + <span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span></code></pre></div> + +<p><strong>Please note:</strong> At the moment, Hadoop OutputFormats must be executed with a parallelism of 1 (DOP = 1). This limitation will be resolved soon.</p> + +<h3 id="using-hadoop-mappers-and-reducers">Using Hadoop Mappers and Reducers</h3> + +<p>Hadoop Mappers are semantically equivalent to Flinkâs <a href="dataset_transformations.html#flatmap">FlatMapFunctions</a> and Hadoop Reducers are equivalent to Flinkâs <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunctions</a>. Flink provides wrappers for implementations of Hadoop MapReduceâs <code>Mapper</code> and <code>Reducer</code> interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoopâs mapred API (<code>org.apache.hadoop.mapred</code>) are supported.</p> + +<p>The wrappers take a <code>DataSet<Tuple2<KEYIN,VALUEIN>></code> as input and produce a <code>DataSet<Tuple2<KEYOUT,VALUEOUT>></code> as output where <code>KEYIN</code> and <code>KEYOUT</code> are the keys and <code>VALUEIN</code> and <code>VALUEOUT</code> are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (<code>HadoopReduceCombineFunction</code>) and without a Combiner (<code>HadoopReduceFunction</code>). The wrappers accept an optional <code>JobConf</code> object to configure the Hadoop Mapper or Reducer.</p> + +<p>Flinkâs function wrappers are </p> + +<ul> + <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction</code>,</li> + <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction</code>, and</li> + <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction</code>.</li> +</ul> + +<p>and can be used as regular Flink <a href="dataset_transformations.html#flatmap">FlatMapFunctions</a> or <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunctions</a>.</p> + +<p>The following example shows how to use Hadoop <code>Mapper</code> and <code>Reducer</code> functions.</p> + +<div class="highlight"><pre><code class="language-java"><span class="c1">// Obtain data to process somehow.</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>></span> <span class="n">text</span> <span class="o">=</span> <span class="o">[...]</span> + +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>></span> <span class="n">result</span> <span class="o">=</span> <span class="n">text</span> + <span class="c1">// use Hadoop Mapper (Tokenizer) as MapFunction</span> + <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopMapFunction</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> + <span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()</span> + <span class="o">))</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> + <span class="c1">// use Hadoop Reducer (Counter) as Reduce- and CombineFunction</span> + <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopReduceCombineFunction</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> + <span class="k">new</span> <span class="nf">Counter</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Counter</span><span class="o">()</span> + <span class="o">));</span></code></pre></div> + +<p><strong>Please note:</strong> The Reducer wrapper works on groups as defined by Flinkâs <a href="dataset_transformations.html#transformations-on-grouped-dataset">groupBy()</a> operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the <code>JobConf</code>. </p> + +<h3 id="complete-hadoop-wordcount-example">Complete Hadoop WordCount Example</h3> + +<p>The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.</p> + +<div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> + +<span class="c1">// Set up the Hadoop TextInputFormat.</span> +<span class="n">Job</span> <span class="n">job</span> <span class="o">=</span> <span class="n">Job</span><span class="o">.</span><span class="na">getInstance</span><span class="o">();</span> +<span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">></span> <span class="n">hadoopIF</span> <span class="o">=</span> + <span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>(</span> + <span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">job</span> + <span class="o">);</span> +<span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">inputPath</span><span class="o">));</span> + +<span class="c1">// Read data using the Hadoop TextInputFormat.</span> +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>></span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">hadoopIF</span><span class="o">);</span> + +<span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>></span> <span class="n">result</span> <span class="o">=</span> <span class="n">text</span> + <span class="c1">// use Hadoop Mapper (Tokenizer) as MapFunction</span> + <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopMapFunction</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> + <span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()</span> + <span class="o">))</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> + <span class="c1">// use Hadoop Reducer (Counter) as Reduce- and CombineFunction</span> + <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopReduceCombineFunction</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> + <span class="k">new</span> <span class="nf">Counter</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Counter</span><span class="o">()</span> + <span class="o">));</span> + +<span class="c1">// Set up the Hadoop TextOutputFormat.</span> +<span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">></span> <span class="n">hadoopOF</span> <span class="o">=</span> + <span class="k">new</span> <span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(</span> + <span class="k">new</span> <span class="n">TextOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(),</span> <span class="n">job</span> + <span class="o">);</span> +<span class="n">hadoopOF</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">"mapreduce.output.textoutputformat.separator"</span><span class="o">,</span> <span class="s">" "</span><span class="o">);</span> +<span class="n">TextOutputFormat</span><span class="o">.</span><span class="na">setOutputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">outputPath</span><span class="o">));</span> + +<span class="c1">// Emit data using the Hadoop TextOutputFormat.</span> +<span class="n">result</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">)</span> + <span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> + +<span class="c1">// Execute Program</span> +<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Hadoop WordCount"</span><span class="o">);</span></code></pre></div> + + + <!-- Disqus Area --> <div style="padding-top:30px" id="disqus_thread"></div> <script type="text/javascript"> @@ -125,14 +329,12 @@ })(); </script> <noscript>Please enable JavaScript to view the <a href="http://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript> - <a href="http://disqus.com" class="dsq-brlink">comments powered by <span class="logo-disqus">Disqus</span></a> - </div> </div> <div class="footer"> - <p><hr class="divider"></p> + <hr class="divider" /> <p><small>Apache Flink is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC. Incubation is @@ -143,9 +345,10 @@ incubation status is not necessarily a r stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.</small></p> -<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo"></a></p> +<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo" /></a></p> -<p class="text-center"><a href="/privacy-policy.html">Privacy Policy<a></p> +<p class="text-center"><a href="privacy-policy.html">Privacy Policy<a> +</a></a></p> </div> </div>
