Repository: flink-web Updated Branches: refs/heads/asf-site b0b9c2626 -> efd6ccaec
Update FAQ Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/efd6ccae Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/efd6ccae Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/efd6ccae Branch: refs/heads/asf-site Commit: efd6ccaecb02157ca0816c6f338ccca71dc0b9f9 Parents: b0b9c26 Author: Stephan Ewen <[email protected]> Authored: Thu Jan 18 20:56:32 2018 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Jan 19 18:36:48 2018 +0100 ---------------------------------------------------------------------- content/faq.html | 520 +++++++++++--------------------------------------- faq.md | 463 ++++++++++---------------------------------- 2 files changed, 206 insertions(+), 777 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink-web/blob/efd6ccae/content/faq.html ---------------------------------------------------------------------- diff --git a/content/faq.html b/content/faq.html index a9b7090..5de5014 100644 --- a/content/faq.html +++ b/content/faq.html @@ -156,169 +156,119 @@ specific language governing permissions and limitations under the License. --> -<p>The following questions are frequently asked with regard to the Flink project <strong>in general</strong>. If you have further questions, make sure to consult the <a href="http://ci.apache.org/projects/flink/flink-docs-master">documentation</a> or <a href="/community.html">ask the community</a>.</p> +<p>The following questions are frequently asked with regard to the Flink project <strong>in general</strong>. +If you have further questions, make sure to consult the <a href="http://ci.apache.org/projects/flink/flink-docs-master">documentation</a> or <a href="/community.html">ask the community</a>.</p> <div class="page-toc"> <ul id="markdown-toc"> <li><a href="#general" id="markdown-toc-general">General</a> <ul> - <li><a href="#is-flink-a-hadoop-project" id="markdown-toc-is-flink-a-hadoop-project">Is Flink a Hadoop Project?</a></li> - <li><a href="#do-i-have-to-install-apache-hadoop-to-use-flink" id="markdown-toc-do-i-have-to-install-apache-hadoop-to-use-flink">Do I have to install Apache Hadoop to use Flink?</a></li> + <li><a href="#is-apache-flink-only-for-near-real-time-processing-use-cases" id="markdown-toc-is-apache-flink-only-for-near-real-time-processing-use-cases">Is Apache Flink only for (near) real-time processing use cases?</a></li> + <li><a href="#if-everything-is-a-stream-why-are-there-a-datastream-and-a-dataset-api-in-flink" id="markdown-toc-if-everything-is-a-stream-why-are-there-a-datastream-and-a-dataset-api-in-flink">If everything is a stream, why are there a DataStream and a DataSet API in Flink?</a></li> + <li><a href="#how-does-flink-relate-to-the-hadoop-stack" id="markdown-toc-how-does-flink-relate-to-the-hadoop-stack">How does Flink relate to the Hadoop Stack?</a></li> + <li><a href="#what-other-stacks-does-flink-run-in" id="markdown-toc-what-other-stacks-does-flink-run-in">What other stacks does Flink run in?</a></li> + <li><a href="#what-are-the-prerequisites-to-use-flink" id="markdown-toc-what-are-the-prerequisites-to-use-flink">What are the prerequisites to use Flink?</a></li> + <li><a href="#what-scale-does-flink-support" id="markdown-toc-what-scale-does-flink-support">What scale does Flink support?</a></li> + <li><a href="#is-flink-limited-to-in-memory-data-sets" id="markdown-toc-is-flink-limited-to-in-memory-data-sets">Is Flink limited to in-memory data sets?</a></li> </ul> </li> - <li><a href="#usage" id="markdown-toc-usage">Usage</a> <ul> - <li><a href="#how-do-i-assess-the-progress-of-a-flink-program" id="markdown-toc-how-do-i-assess-the-progress-of-a-flink-program">How do I assess the progress of a Flink program?</a></li> - <li><a href="#how-can-i-figure-out-why-a-program-failed" id="markdown-toc-how-can-i-figure-out-why-a-program-failed">How can I figure out why a program failed?</a></li> - <li><a href="#how-do-i-debug-flink-programs" id="markdown-toc-how-do-i-debug-flink-programs">How do I debug Flink programs?</a></li> - <li><a href="#what-is-the-parallelism-how-do-i-set-it" id="markdown-toc-what-is-the-parallelism-how-do-i-set-it">What is the parallelism? How do I set it?</a></li> - </ul> - </li> - <li><a href="#errors" id="markdown-toc-errors">Errors</a> <ul> - <li><a href="#why-am-i-getting-a-nonserializableexception-" id="markdown-toc-why-am-i-getting-a-nonserializableexception-">Why am I getting a âNonSerializableExceptionâ ?</a></li> - <li><a href="#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters" id="markdown-toc-in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters">In Scala API, I get an error about implicit values and evidence parameters</a></li> - <li><a href="#i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this" id="markdown-toc-i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this">I get an error message saying that not enough buffers are available. How do I fix this?</a></li> - <li><a href="#my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause" id="markdown-toc-my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause">My job fails early with a java.io.EOFException. What could be the cause?</a></li> + <li><a href="#debugging-and-common-error-messages" id="markdown-toc-debugging-and-common-error-messages">Debugging and Common Error Messages</a> <ul> + <li><a href="#i-have-a-notserializableexception" id="markdown-toc-i-have-a-notserializableexception">I have a NotSerializableException.</a></li> + <li><a href="#using-the-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters" id="markdown-toc-using-the-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters">Using the Scala API, I get an error about implicit values and evidence parameters.</a></li> + <li><a href="#i-see-a-classcastexception-x-cannot-be-cast-to-x" id="markdown-toc-i-see-a-classcastexception-x-cannot-be-cast-to-x">I see a ClassCastException: X cannot be cast to X.</a></li> + <li><a href="#i-have-an-abstractmethoderror-or-nosuchfielderror" id="markdown-toc-i-have-an-abstractmethoderror-or-nosuchfielderror">I have an AbstractMethodError or NoSuchFieldError.</a></li> + <li><a href="#my-datastream-application-produces-no-output-even-though-events-are-going-in" id="markdown-toc-my-datastream-application-produces-no-output-even-though-events-are-going-in">My DataStream application produces no output, even though events are going in.</a></li> + <li><a href="#i-see-an-exception-reporting-insufficient-number-of-network-buffers" id="markdown-toc-i-see-an-exception-reporting-insufficient-number-of-network-buffers">I see an exception reporting âInsufficient number of network buffersâ.</a></li> <li><a href="#my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do" id="markdown-toc-my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do">My job fails with various exceptions from the HDFS/Hadoop code. What can I do?</a></li> - <li><a href="#in-eclipse-i-get-compilation-errors-in-the-scala-projects" id="markdown-toc-in-eclipse-i-get-compilation-errors-in-the-scala-projects">In Eclipse, I get compilation errors in the Scala projects</a></li> - <li><a href="#my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types-not-groupedjoined-correctly" id="markdown-toc-my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types-not-groupedjoined-correctly">My program does not compute the correct result. Why are my custom key types not grouped/joined correctly?</a></li> - <li><a href="#i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong" id="markdown-toc-i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong">I get a java.lang.InstantiationException for my data type, what is wrong?</a></li> - <li><a href="#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do" id="markdown-toc-i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do">I canât stop Flink with the provided stop-scripts. What can I do?</a></li> - <li><a href="#i-got-an-outofmemoryexception-what-can-i-do" id="markdown-toc-i-got-an-outofmemoryexception-what-can-i-do">I got an OutOfMemoryException. What can I do?</a></li> - <li><a href="#why-do-the-taskmanager-log-files-become-so-huge" id="markdown-toc-why-do-the-taskmanager-log-files-become-so-huge">Why do the TaskManager log files become so huge?</a></li> - <li><a href="#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do" id="markdown-toc-the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do">The slot allocated for my task manager has been released. What should I do?</a></li> - </ul> - </li> - <li><a href="#yarn-deployment" id="markdown-toc-yarn-deployment">YARN Deployment</a> <ul> - <li><a href="#the-yarn-session-runs-only-for-a-few-seconds" id="markdown-toc-the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</a></li> - <li><a href="#my-yarn-containers-are-killed-because-they-use-too-much-memory" id="markdown-toc-my-yarn-containers-are-killed-because-they-use-too-much-memory">My YARN containers are killed because they use too much memory</a></li> - <li><a href="#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup" id="markdown-toc-the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</a></li> - <li><a href="#my-job-is-not-reacting-to-a-job-cancellation" id="markdown-toc-my-job-is-not-reacting-to-a-job-cancellation">My job is not reacting to a job cancellation?</a></li> - </ul> - </li> - <li><a href="#features" id="markdown-toc-features">Features</a> <ul> - <li><a href="#what-kind-of-fault-tolerance-does-flink-provide" id="markdown-toc-what-kind-of-fault-tolerance-does-flink-provide">What kind of fault-tolerance does Flink provide?</a></li> - <li><a href="#are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported" id="markdown-toc-are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</a></li> </ul> </li> </ul> </div> -<h2 id="general">General</h2> +<h1 id="general">General</h1> -<h3 id="is-flink-a-hadoop-project">Is Flink a Hadoop Project?</h3> +<h2 id="is-apache-flink-only-for-near-real-time-processing-use-cases">Is Apache Flink only for (near) real-time processing use cases?</h2> -<p>Flink is a data processing system and an <strong>alternative to Hadoopâs -MapReduce component</strong>. It comes with its <em>own runtime</em> rather than building on top -of MapReduce. As such, it can work completely independently of the Hadoop -ecosystem. However, Flink can also access Hadoopâs distributed file -system (HDFS) to read and write data, and Hadoopâs next-generation resource -manager (YARN) to provision cluster resources. Since most Flink users are -using Hadoop HDFS to store their data, Flink already ships the required libraries to -access HDFS.</p> +<p>Flink is a very general system for data processing and data-driven applications with <em>data streams</em> as +the core building block. These data streams can be streams of real-time data, or stored streams of historic data. +For example, in Flinkâs view a file is a stored stream of bytes. Because of that, Flink +supports both real-time data processing and applications, as well as batch processing applications.</p> -<h3 id="do-i-have-to-install-apache-hadoop-to-use-flink">Do I have to install Apache Hadoop to use Flink?</h3> +<p>Streams can be <em>unbounded</em> (have no end, events continuously keep coming) or be <em>bounded</em> (streams have a beginning +and an end). For example, a Twitter feed or a stream of events from a message queue are generally unbounded streams, +whereas a stream of bytes from a file is a bounded stream.</p> -<p><strong>No</strong>. Flink can run <strong>without</strong> a Hadoop installation. However, a <em>very common</em> -setup is to use Flink to analyze data stored in the Hadoop Distributed -File System (HDFS). To make these setups work out of the box, Flink bundles the -Hadoop client libraries by default.</p> +<h2 id="if-everything-is-a-stream-why-are-there-a-datastream-and-a-dataset-api-in-flink">If everything is a stream, why are there a DataStream and a DataSet API in Flink?</h2> -<p>Additionally, we provide a special YARN Enabled download of Flink for -users with an existing Hadoop YARN cluster. <a href="http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html">Apache Hadoop -YARN</a> -is Hadoopâs cluster resource manager that allows use of -different execution engines next to each other on a cluster.</p> +<p>Bounded streams are often more efficient to process than unbounded streams. Processing unbounded streams of events +in (near) real-time requires the system to be able to immediately act on events and to produce intermediate +results (often with low latency). Processing bounded streams usually does not require to produce low latency results, because the data is a while old +anyway (in relative terms). That allows Flink to process the data in a simple and more efficient way.</p> -<h2 id="usage">Usage</h2> +<p>The <em>DataStream</em> API captures the continuous processing of unbounded and bounded streams, with a model that supports +low latency results and flexible reaction to events and time (including event time).</p> -<h3 id="how-do-i-assess-the-progress-of-a-flink-program">How do I assess the progress of a Flink program?</h3> +<p>The <em>DataSet</em> API has techniques that often speed up the processing of bounded data streams. In the future, the community +plans to combine these optimizations with the techniques in the DataStream API.</p> -<p>There are a multiple of ways to track the progress of a Flink program:</p> +<h2 id="how-does-flink-relate-to-the-hadoop-stack">How does Flink relate to the Hadoop Stack?</h2> -<ul> - <li>The JobManager (the master of the distributed system) starts a web interface -to observe program execution. It runs on port 8081 by default (configured in -<code>conf/flink-config.yml</code>).</li> - <li>When you start a program from the command line, it will print the status -changes of all operators as the program progresses through the operations.</li> - <li>All status changes are also logged to the JobManagerâs log file.</li> -</ul> +<p>Flink is independent of <a href="https://hadoop.apache.org/">Apache Hadoop</a> and runs without any Hadoop dependencies.</p> -<h3 id="how-can-i-figure-out-why-a-program-failed">How can I figure out why a program failed?</h3> +<p>However, Flink integrates very well with many Hadoop components, for example <em>HDFS</em>, <em>YARN</em>, or <em>HBase</em>. +When running together with these components, Flink can use HDFS to read data, or write results and checkpoints/snapshots. +Flink can be easily deployed via YARN and integrates with the YARN and HDFS Kerberos security modules.</p> -<ul> - <li>The JobManager web frontend (by default on port 8081) displays the exceptions -of failed tasks.</li> - <li>If you run the program from the command-line, task exceptions are printed to -the standard error stream and shown on the console.</li> - <li>Both the command line and the web interface allow you to figure out which -parallel task first failed and caused the other tasks to cancel the execution.</li> - <li>Failing tasks and the corresponding exceptions are reported in the log files -of the master and the worker where the exception occurred -(<code>log/flink-<user>-jobmanager-<host>.log</code> and -<code>log/flink-<user>-taskmanager-<host>.log</code>).</li> -</ul> +<h2 id="what-other-stacks-does-flink-run-in">What other stacks does Flink run in?</h2> -<h3 id="how-do-i-debug-flink-programs">How do I debug Flink programs?</h3> +<p>Users run Flink on <a href="https://kubernetes.io">Kubernetes</a>, <a href="https://mesos.apache.org/">Mesos</a>, +<a href="https://www.docker.com/">Docker</a>, or even as standalone services.</p> + +<h2 id="what-are-the-prerequisites-to-use-flink">What are the prerequisites to use Flink?</h2> <ul> - <li>When you start a program locally with the <a href="http://ci.apache.org/projects/flink/flink-docs-master/dev/local_execution.html">LocalExecutor</a>, -you can place breakpoints in your functions and debug them like normal -Java/Scala programs.</li> - <li>The <a href="http://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#accumulators--counters">Accumulators</a> are very helpful in -tracking the behavior of the parallel execution. They allow you to gather -information inside the programâs operations and show them after the program -execution.</li> + <li>You need <em>Java 8</em> to run Flink jobs/applications.</li> + <li>The Scala API (optional) depends on Scala 2.11.</li> + <li>Highly-available setups with no single point of failure require <a href="https://zookeeper.apache.org/">Apache ZooKeeper</a>.</li> + <li>For highly-available stream processing setups that can recover from failures, Flink requires some form of distributed storage for checkpoints (HDFS / S3 / NFS / SAN / GFS / Kosmos / Ceph / â¦).</li> </ul> -<h3 id="what-is-the-parallelism-how-do-i-set-it">What is the parallelism? How do I set it?</h3> +<h2 id="what-scale-does-flink-support">What scale does Flink support?</h2> -<p>In Flink programs, the parallelism determines how operations are split into -individual tasks which are assigned to task slots. Each node in a cluster has at -least one task slot. The total number of task slots is the number of all task slots -on all machines. If the parallelism is set to <code>N</code>, Flink tries to divide an -operation into <code>N</code> parallel tasks which can be computed concurrently using the -available task slots. The number of task slots should be equal to the -parallelism to ensure that all tasks can be computed in a task slot concurrently.</p> +<p>Users are running Flink jobs both in very small setups (fewer than 5 nodes) and on 1000s of nodes and with TBs of state.</p> -<p><strong>Note</strong>: Not all operations can be divided into multiple tasks. For example, a -<code>GroupReduce</code> operation without a grouping has to be performed with a -parallelism of 1 because the entire group needs to be present at exactly one -node to perform the reduce operation. Flink will determine whether the -parallelism has to be 1 and set it accordingly.</p> +<h2 id="is-flink-limited-to-in-memory-data-sets">Is Flink limited to in-memory data sets?</h2> -<p>The parallelism can be set in numerous ways to ensure a fine-grained control -over the execution of a Flink program. See -the <a href="http://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#common-options">Configuration guide</a> for detailed instructions on how to -set the parallelism. Also check out <a href="http://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuring-taskmanager-processing-slots">this figure</a> detailing -how the processing slots and parallelism are related to each other.</p> +<p>For the DataStream API, Flink supports larger-than-memory state be configuring the RocksDB state backend.</p> -<h2 id="errors">Errors</h2> +<p>For the DataSet API, all operations (except delta-iterations) can scale beyond main memory.</p> -<h3 id="why-am-i-getting-a-nonserializableexception-">Why am I getting a âNonSerializableExceptionâ ?</h3> +<h1 id="debugging-and-common-error-messages">Debugging and Common Error Messages</h1> -<p>All functions in Flink must be serializable, as defined by <a href="http://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html">java.io.Serializable</a>. -Since all function interfaces are serializable, the exception means that one -of the fields used in your function is not serializable.</p> +<h2 id="i-have-a-notserializableexception">I have a NotSerializableException.</h2> -<p>In particular, if your function is an inner class, or anonymous inner class, -it contains a hidden reference to the enclosing class (usually called <code>this$0</code>, if you look -at the function in the debugger). If the enclosing class is not serializable, this is probably -the source of the error. Solutions are to</p> +<p>Flink uses Java serialization to distribute copies of the application logic (the functions and operations you implement, +as well as the program configuration, etc.) to the parallel worker processes. +Because of that, all functions that you pass to the API must be serializable, as defined by +<a href="http://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html">java.io.Serializable</a>.</p> -<ul> - <li>make the function a standalone class, or a static inner class (no more reference to the enclosing class)</li> - <li>make the enclosing class serializable</li> - <li>use a Java 8 lambda function.</li> -</ul> +<p>If your function is an anonymous inner class, consider the following: + - make the function a standalone class, or a static inner class + - use a Java 8 lambda function.</p> -<h3 id="in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters">In Scala API, I get an error about implicit values and evidence parameters</h3> +<p>Is your function is already a static class, check the fields that you assign when you create +an instance of the class. One of the fields most likely holds a non-serializable type. + - In Java, use a <code>RichFunction</code> and initialize the problematic fields in the <code>open()</code> method. + - In Scala, you can often simply use âlazy valâ to defer initialization until the distributed execution happens. This may come at a minor performance cost. You can naturally also use a <code>RichFunction</code> in Scala.</p> -<p>It means that the implicit value for the type information could not be provided. -Make sure that you have an <code>import org.apache.flink.api.scala._</code> statement in your code.</p> +<h2 id="using-the-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters">Using the Scala API, I get an error about implicit values and evidence parameters.</h2> -<p>If you are using flink operations inside functions or classes that take -generic parameters a TypeInformation must be available for that parameter. +<p>This error means that the implicit value for the type information could not be provided. +Make sure that you have an <code>import org.apache.flink.streaming.api.scala._</code> (DataStream API) or an +<code>import org.apache.flink.api.scala._</code> (DataSet API) statement in your code.</p> + +<p>If you are using Flink operations inside functions or classes that take +generic parameters, then a TypeInformation must be available for that parameter. This can be achieved by using a context bound:</p> <div class="highlight"><pre><code class="language-scala"><span class="k">def</span> <span class="n">myFunction</span><span class="o">[</span><span class="kt">T:</span> <span class="kt">TypeInformation</span><span class="o">](</span><span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Seq</span><span class="o">[</span><span class="kt">T</span><span class="o">]]</span> <span class="k">=</span> <span class="o">{</span> @@ -328,316 +278,66 @@ This can be achieved by using a context bound:</p> <p>See <a href="http://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html">Type Extraction and Serialization</a> for an in-depth discussion of how Flink handles types.</p> -<h3 id="i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this">I get an error message saying that not enough buffers are available. How do I fix this?</h3> - -<p>If you run Flink in a massively parallel setting (100+ parallel threads), -you need to adapt the number of network buffers via the config parameter -<code>taskmanager.network.numberOfBuffers</code>. -As a rule-of-thumb, the number of buffers should be at least -<code>4 * numberOfTaskManagers * numberOfSlotsPerTaskManager^2</code>. See -<a href="http://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuring-the-network-buffers">Configuration Reference</a> for details.</p> - -<h3 id="my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause">My job fails early with a java.io.EOFException. What could be the cause?</h3> - -<p>The most common case for these exception is when Flink is set up with the -wrong HDFS version. Because different HDFS versions are often not compatible -with each other, the connection between the filesystem master and the client -breaks.</p> - -<div class="highlight"><pre><code class="language-bash">Call to <host:port> failed on <span class="nb">local </span>exception: java.io.EOFException - at org.apache.hadoop.ipc.Client.wrapException<span class="o">(</span>Client.java:775<span class="o">)</span> - at org.apache.hadoop.ipc.Client.call<span class="o">(</span>Client.java:743<span class="o">)</span> - at org.apache.hadoop.ipc.RPC<span class="nv">$Invoker</span>.invoke<span class="o">(</span>RPC.java:220<span class="o">)</span> - at <span class="nv">$Proxy0</span>.getProtocolVersion<span class="o">(</span>Unknown Source<span class="o">)</span> - at org.apache.hadoop.ipc.RPC.getProxy<span class="o">(</span>RPC.java:359<span class="o">)</span> - at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode<span class="o">(</span>DFSClient.java:106<span class="o">)</span> - at org.apache.hadoop.hdfs.DFSClient.<init><span class="o">(</span>DFSClient.java:207<span class="o">)</span> - at org.apache.hadoop.hdfs.DFSClient.<init><span class="o">(</span>DFSClient.java:170<span class="o">)</span> - at org.apache.hadoop.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:82<span class="o">)</span> - at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:276</code></pre></div> - -<p>Please refer to the <a href="/downloads.html">download page</a> and -the <a href="https://github.com/apache/flink/tree/master/README.md">build instructions</a> -for details on how to set up Flink for different Hadoop and HDFS versions.</p> - -<h3 id="my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do">My job fails with various exceptions from the HDFS/Hadoop code. What can I do?</h3> - -<p>Flink is shipping with the Hadoop 2.2 binaries by default. These binaries are used -to connect to HDFS or YARN. -It seems that there are some bugs in the HDFS client which cause exceptions while writing to HDFS -(in particular under high load). -Among the exceptions are the following:</p> - -<ul> - <li><code>HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby"</code></li> - <li> - <p><code>java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064 from datanode 172.22.5.81:50010 -at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)</code></p> - </li> - <li><code>Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0 - at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)</code></li> -</ul> - -<p>If you are experiencing any of these, we recommend using a Flink build with a Hadoop version matching -your local HDFS version. -You can also manually build Flink against the exact Hadoop version (for example -when using a Hadoop distribution with a custom patch level)</p> - -<h3 id="in-eclipse-i-get-compilation-errors-in-the-scala-projects">In Eclipse, I get compilation errors in the Scala projects</h3> - -<p>Flink uses a new feature of the Scala compiler (called âquasiquotesâ) that have not yet been properly -integrated with the Eclipse Scala plugin. In order to make this feature available in Eclipse, you -need to manually configure the <em>flink-scala</em> project to use a <em>compiler plugin</em>:</p> - -<ul> - <li>Right click on <em>flink-scala</em> and choose âPropertiesâ</li> - <li>Select âScala Compilerâ and click on the âAdvancedâ tab. (If you do not have that, you probably have not set up Eclipse for Scala properly.)</li> - <li>Check the box âUse Project Settingsâ</li> - <li>In the field âXpluginâ, put the path â/home/<user-name>/.m2/repository/org/scalamacros/paradise_2.10.4/2.0.1/paradise_2.10.4-2.0.1.jar"</user-name></li> - <li>NOTE: You have to build Flink with Maven on the command line first, to make sure the plugin is downloaded.</li> -</ul> - -<h3 id="my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types-not-groupedjoined-correctly">My program does not compute the correct result. Why are my custom key types not grouped/joined correctly?</h3> +<h2 id="i-see-a-classcastexception-x-cannot-be-cast-to-x">I see a ClassCastException: X cannot be cast to X.</h2> -<p>Keys must correctly implement the methods <code>java.lang.Object#hashCode()</code>, -<code>java.lang.Object#equals(Object o)</code>, and <code>java.util.Comparable#compareTo(...)</code>. -These methods are always backed with default implementations which are usually -inadequate. Therefore, all keys must override <code>hashCode()</code> and <code>equals(Object o)</code>.</p> +<p>When you see an exception in the style <code>com.foo.X</code> cannot be cast to <code>com.foo.X</code> (or cannot be assigned to <code>com.foo.X</code>), it means that +multiple versions of the class <code>com.foo.X</code> have been loaded by different class loaders, and types of that class are attempted to be assigned to each other.</p> -<h3 id="i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong">I get a java.lang.InstantiationException for my data type, what is wrong?</h3> - -<p>All data type classes must be public and have a public nullary constructor -(constructor with no arguments). Further more, the classes must not be abstract -or interfaces. If the classes are internal classes, they must be public and -static.</p> - -<h3 id="i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do">I canât stop Flink with the provided stop-scripts. What can I do?</h3> - -<p>Stopping the processes sometimes takes a few seconds, because the shutdown may -do some cleanup work.</p> - -<p>In some error cases it happens that the JobManager or TaskManager cannot be -stopped with the provided stop-scripts (<code>bin/stop-local.sh</code> or <code>bin/stop- -cluster.sh</code>). You can kill their processes on Linux/Mac as follows:</p> +<p>The reason for that can be:</p> <ul> - <li>Determine the process id (pid) of the JobManager / TaskManager process. You -can use the <code>jps</code> command on Linux(if you have OpenJDK installed) or command -<code>ps -ef | grep java</code> to find all Java processes.</li> - <li>Kill the process with <code>kill -9 <pid></code>, where <code>pid</code> is the process id of the -affected JobManager or TaskManager process.</li> -</ul> - -<p>On Windows, the TaskManager shows a table of all processes and allows you to -destroy a process by right its entry.</p> - -<p>Both the JobManager and TaskManager services will write signals (like SIGKILL -and SIGTERM) into their respective log files. This can be helpful for -debugging issues with the stopping behavior.</p> - -<h3 id="i-got-an-outofmemoryexception-what-can-i-do">I got an OutOfMemoryException. What can I do?</h3> - -<p>These exceptions occur usually when the functions in the program consume a lot -of memory by collection large numbers of objects, for example in lists or maps. -The OutOfMemoryExceptions in Java are kind of tricky. The exception is not -necessarily thrown by the component that allocated most of the memory but by the -component that tried to requested the latest bit of memory that could not be -provided.</p> - -<p>There are two ways to go about this:</p> - -<ol> <li> - <p>See whether you can use less memory inside the functions. For example, use -arrays of primitive types instead of object types.</p> + <p>Class duplication through <code>child-first</code> classloading. That is an intended mechanism to allow users to use different versions of the same +dependencies that Flink uses. However, if different copies of these classes move between Flinkâs core and the user application code, such an exception +can occur. To verify that this is the reason, try setting <code>classloader.resolve-order: parent-first</code> in the configuration. +If that makes the error disappear, please write to the mailing list to check if that may be a bug.</p> </li> <li> - <p>Reduce the memory that Flink reserves for its own processing. The -TaskManager reserves a certain portion of the available memory for sorting, -hashing, caching, network buffering, etc. That part of the memory is unavailable -to the user-defined functions. By reserving it, the system can guarantee to not -run out of memory on large inputs, but to plan with the available memory and -destage operations to disk, if necessary. By default, the system reserves around -70% of the memory. If you frequently run applications that need more memory in -the user-defined functions, you can reduce that value using the configuration -entries <code>taskmanager.memory.fraction</code> or <code>taskmanager.memory.size</code>. See the -<a href="http://ci.apache.org/projects/flink/flink-docs-master/ops/config.html">Configuration Reference</a> for details. This will leave more memory to JVM heap, -but may cause data processing tasks to go to disk more often.</p> + <p>Caching of classes from different execution attempts, for example by utilities like Guavaâs Interners, or Avroâs Schema cache. +Try to not use interners, or reduce the scope of the interner/cache to make sure a new cache is created whenever a new task +execution is started.</p> </li> -</ol> - -<p>Another reason for OutOfMemoryExceptions is the use of the wrong state backend. -By default, Flink is using a heap-based state backend for operator state in -streaming jobs. The <code>RocksDBStateBackend</code> allows state sizes larger than the -available heap space.</p> - -<h3 id="why-do-the-taskmanager-log-files-become-so-huge">Why do the TaskManager log files become so huge?</h3> +</ul> -<p>Check the logging behavior of your jobs. Emitting logging per object or tuple may be -helpful to debug jobs in small setups with tiny data sets but can limit performance -and consume substantial disk space if used for large input data.</p> +<h2 id="i-have-an-abstractmethoderror-or-nosuchfielderror">I have an AbstractMethodError or NoSuchFieldError.</h2> -<h3 id="the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do">The slot allocated for my task manager has been released. What should I do?</h3> +<p>Such errors typically indicate a mix-up in some dependency version. That means a different version of a dependency (a library) +is loaded during the execution compared to the version that code was compiled against.</p> -<p>If you see a <code>java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager</code> even though the TaskManager did actually not crash, it -means that the TaskManager was unresponsive for a time. That can be due to network issues, but is frequently due to long garbage collection stalls. -In this case, a quick fix would be to use an incremental Garbage Collector, like the G1 garbage collector. It usually leads to shorter pauses. Furthermore, you can dedicate more memory to -the user code by reducing the amount of memory Flink grabs for its internal operations (see configuration of TaskManager managed memory).</p> +<p>From Flink 1.4.0 on, dependencies in your application JAR file may have different versions compared to dependencies used +by Flinkâs core, or other dependencies in the classpath (for example from Hadoop). That requires <code>child-first</code> classloading +to be activated, which is the default.</p> -<p>If both of these approaches fail and the error persists, simply increase the TaskManagerâs heartbeat pause by setting AKKA_WATCH_HEARTBEAT_PAUSE (akka.watch.heartbeat.pause) to a greater value (e.g. 600s). -This will cause the JobManager to wait for a heartbeat for a longer time interval before considering the TaskManager lost.</p> +<p>If you see these problems in Flink 1.4+, one of the following may be true: + - You have a dependency version conflict within your application code. Make sure all your dependency versions are consistent. + - You are conflicting with a library that Flink cannot support via <code>child-first</code> classloading. Currently these are the + Scala standard library classes, as well as Flinkâs own classes, logging APIs, and any Hadoop core classes.</p> -<h2 id="yarn-deployment">YARN Deployment</h2> +<h2 id="my-datastream-application-produces-no-output-even-though-events-are-going-in">My DataStream application produces no output, even though events are going in.</h2> -<h3 id="the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</h3> +<p>If your DataStream application uses <em>Event Time</em>, check that your watermarks get updated. If no watermarks are produced, +event time windows might never trigger, and the application would produce no results.</p> -<p>The <code>./bin/yarn-session.sh</code> script is intended to run while the YARN-session is -open. In some error cases however, the script immediately stops running. The -output looks like this:</p> +<p>You can check in Flinkâs web UI (watermarks section) whether watermarks are making progress.</p> -<div class="highlight"><pre><code>07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host -Flink JobManager is now running on worker1:6123 -JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/ -07:34:51,528 INFO org.apache.flinkyarn.Client - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553 -07:34:51,529 INFO org.apache.flinkyarn.Client - Killing the Flink-YARN application. -07:34:51,529 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killing application application_1295604279745_273123 -07:34:51,534 INFO org.apache.flinkyarn.Client - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123 -07:34:51,559 INFO org.apache.flinkyarn.Client - YARN Client is shutting down -</code></pre></div> +<h2 id="i-see-an-exception-reporting-insufficient-number-of-network-buffers">I see an exception reporting âInsufficient number of network buffersâ.</h2> -<p>The problem here is that the Application Master (AM) is stopping and the YARN client assumes that the application has finished.</p> +<p>If you run Flink with a very high parallelism, you may need to increase the number of network buffers.</p> -<p>There are three possible reasons for that behavior:</p> +<p>By default, Flink takes 10% of the JVM heap size for network buffers, with a minimum of 64MB and a maximum of 1GB. +You can adjust all these values via <code>taskmanager.network.memory.fraction</code>, <code>taskmanager.network.memory.min</code>, and +<code>taskmanager.network.memory.max</code>.</p> -<ul> - <li> - <p>The ApplicationMaster exited with an exception. To debug that error, have a -look in the logfiles of the container. The <code>yarn-site.xml</code> file contains the -configured path. The key for the path is <code>yarn.nodemanager.log-dirs</code>, the -default value is <code>${yarn.log.dir}/userlogs</code>.</p> - </li> - <li> - <p>YARN has killed the container that runs the ApplicationMaster. This case -happens when the AM used too much memory or other resources beyond YARNâs -limits. In this case, youâll find error messages in the nodemanager logs on -the host.</p> - </li> - <li> - <p>The operating system has shut down the JVM of the AM. This can happen if the -YARN configuration is wrong and more memory than physically available is -configured. Execute <code>dmesg</code> on the machine where the AM was running to see if -this happened. You see messages from Linuxâ <a href="http://linux-mm.org/OOM_Killer">OOM killer</a>.</p> - </li> -</ul> - -<h3 id="my-yarn-containers-are-killed-because-they-use-too-much-memory">My YARN containers are killed because they use too much memory</h3> +<p>Please refer to the <a href="http://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuring-the-network-buffers">Configuration Reference</a> for details.</p> -<p>This is usually indicated my a log message like the following one:</p> +<h2 id="my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do">My job fails with various exceptions from the HDFS/Hadoop code. What can I do?</h2> -<div class="highlight"><pre><code>Container container_e05_1467433388200_0136_01_000002 is completed with diagnostics: Container [pid=5832,containerID=container_e05_1467433388200_0136_01_000002] is running beyond physical memory limits. Current usage: 2.3 GB of 2 GB physical memory used; 6.1 GB of 4.2 GB virtual memory used. Killing container. -</code></pre></div> +<p>The most common cause for that is that the Hadoop version in Flinkâs classpath is different than the +Hadoop version of the cluster you want to connect to (HDFS / YARN).</p> -<p>In that case, the JVM process grew too large. Because the Java heap size is always limited, the extra memory typically comes from non-heap sources:</p> - -<ul> - <li>Libraries that use off-heap memory. (Flinkâs own off-heap memory is limited and taken into account when calculating the allowed heap size.)</li> - <li>PermGen space (strings and classes), code caches, memory mapped jar files</li> - <li>Native libraries (RocksDB)</li> -</ul> +<p>The easiest way to fix that is to pick a Hadoop-free Flink version and simply export the Hadoop path and +classpath from the cluster.</p> -<p>You can activate the <a href="http://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#memory-and-performance-debugging">memory debug logger</a> to get more insight into what memory pool is actually using up too much memory.</p> - -<h3 id="the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</h3> - -<p>While starting the YARN session, you are receiving an exception like this:</p> - -<div class="highlight"><pre><code>Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x - at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234) - at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214) - at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5193) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5175) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5149) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2090) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) - at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) - at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) - at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) - at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) - at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) - at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053) - at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) - at java.security.AccessController.doPrivileged(Native Method) - at javax.security.auth.Subject.doAs(Subject.java:396) - at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) - at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047) - - at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) - at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) - at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) - at java.lang.reflect.Constructor.newInstance(Constructor.java:513) - at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) - at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) - at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1393) - at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1382) - at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1307) - at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:384) - at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:380) - at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) - at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:380) - at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:324) - at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) - at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886) - at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783) - at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365) - at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338) - at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2021) - at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1989) - at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1954) - at org.apache.flinkyarn.Utils.setupLocalResource(Utils.java:176) - at org.apache.flinkyarn.Client.run(Client.java:362) - at org.apache.flinkyarn.Client.main(Client.java:568) -</code></pre></div> - -<p>The reason for this error is, that the home directory of the user <strong>in HDFS</strong> -has the wrong permissions. The user (in this case <code>robert</code>) can not create -directories in his own home directory.</p> - -<p>Flink creates a <code>.flink/</code> directory in the users home directory -where it stores the Flink jar and configuration file.</p> - -<h3 id="my-job-is-not-reacting-to-a-job-cancellation">My job is not reacting to a job cancellation?</h3> - -<p>Flink is canceling a job by calling the <code>cancel()</code> method on all user tasks. Ideally, -the tasks properly react to the call and stop what they are currently doing, so that -all threads can shut down.</p> - -<p>If the tasks are not reacting for a certain amount of time, Flink will start interrupting -the thread periodically.</p> - -<p>The TaskManager logs will also contain the current stack of the method where the user -code is blocked.</p> - -<h2 id="features">Features</h2> - -<h3 id="what-kind-of-fault-tolerance-does-flink-provide">What kind of fault-tolerance does Flink provide?</h3> - -<p>For streaming programs Flink has a novel approach to draw periodic snapshots of the streaming dataflow state and use those for recovery. -This mechanism is both efficient and flexible. See the documentation on <a href="http://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html">streaming fault tolerance</a> for details.</p> - -<p>For batch processing programs Flink remembers the programâs sequence of transformations and can restart failed jobs.</p> - -<h3 id="are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</h3> - -<p><a href="http://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#accumulators--counters">Flinkâs Accumulators</a> work very similar like -Hadoopâs counters, but are more powerful.</p> - -<p>Flink has a <a href="https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java">Distributed Cache</a> that is deeply integrated with the APIs. Please refer to the <a href="https://github.com/apache/flink/tree/master/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L831">JavaDocs</a> for details on how to use it.</p> - -<p>In order to make data sets available on all tasks, we encourage you to use <a href="http://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html#broadcast-variables">Broadcast Variables</a> instead. They are more efficient and easier to use than the distributed cache.</p> </div> http://git-wip-us.apache.org/repos/asf/flink-web/blob/efd6ccae/faq.md ---------------------------------------------------------------------- diff --git a/faq.md b/faq.md index 7aeba85..ad0ef4c 100755 --- a/faq.md +++ b/faq.md @@ -20,122 +20,95 @@ specific language governing permissions and limitations under the License. --> -The following questions are frequently asked with regard to the Flink project **in general**. If you have further questions, make sure to consult the [documentation]({{site.docs-snapshot}}) or [ask the community]({{ site.baseurl }}/community.html). +The following questions are frequently asked with regard to the Flink project **in general**. +If you have further questions, make sure to consult the [documentation]({{site.docs-snapshot}}) or [ask the community]({{ site.baseurl }}/community.html). {% toc %} +# General +## Is Apache Flink only for (near) real-time processing use cases? +Flink is a very general system for data processing and data-driven applications with *data streams* as +the core building block. These data streams can be streams of real-time data, or stored streams of historic data. +For example, in Flink's view a file is a stored stream of bytes. Because of that, Flink +supports both real-time data processing and applications, as well as batch processing applications. -## General +Streams can be *unbounded* (have no end, events continuously keep coming) or be *bounded* (streams have a beginning +and an end). For example, a Twitter feed or a stream of events from a message queue are generally unbounded streams, +whereas a stream of bytes from a file is a bounded stream. -### Is Flink a Hadoop Project? +## If everything is a stream, why are there a DataStream and a DataSet API in Flink? -Flink is a data processing system and an **alternative to Hadoop's -MapReduce component**. It comes with its *own runtime* rather than building on top -of MapReduce. As such, it can work completely independently of the Hadoop -ecosystem. However, Flink can also access Hadoop's distributed file -system (HDFS) to read and write data, and Hadoop's next-generation resource -manager (YARN) to provision cluster resources. Since most Flink users are -using Hadoop HDFS to store their data, Flink already ships the required libraries to -access HDFS. +Bounded streams are often more efficient to process than unbounded streams. Processing unbounded streams of events +in (near) real-time requires the system to be able to immediately act on events and to produce intermediate +results (often with low latency). Processing bounded streams usually does not require to produce low latency results, because the data is a while old +anyway (in relative terms). That allows Flink to process the data in a simple and more efficient way. -### Do I have to install Apache Hadoop to use Flink? +The *DataStream* API captures the continuous processing of unbounded and bounded streams, with a model that supports +low latency results and flexible reaction to events and time (including event time). -**No**. Flink can run **without** a Hadoop installation. However, a *very common* -setup is to use Flink to analyze data stored in the Hadoop Distributed -File System (HDFS). To make these setups work out of the box, Flink bundles the -Hadoop client libraries by default. +The *DataSet* API has techniques that often speed up the processing of bounded data streams. In the future, the community +plans to combine these optimizations with the techniques in the DataStream API. -Additionally, we provide a special YARN Enabled download of Flink for -users with an existing Hadoop YARN cluster. [Apache Hadoop -YARN](http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html) -is Hadoop's cluster resource manager that allows use of -different execution engines next to each other on a cluster. +## How does Flink relate to the Hadoop Stack? -## Usage +Flink is independent of [Apache Hadoop](https://hadoop.apache.org/) and runs without any Hadoop dependencies. -### How do I assess the progress of a Flink program? +However, Flink integrates very well with many Hadoop components, for example *HDFS*, *YARN*, or *HBase*. +When running together with these components, Flink can use HDFS to read data, or write results and checkpoints/snapshots. +Flink can be easily deployed via YARN and integrates with the YARN and HDFS Kerberos security modules. -There are a multiple of ways to track the progress of a Flink program: +## What other stacks does Flink run in? -- The JobManager (the master of the distributed system) starts a web interface -to observe program execution. It runs on port 8081 by default (configured in -`conf/flink-config.yml`). -- When you start a program from the command line, it will print the status -changes of all operators as the program progresses through the operations. -- All status changes are also logged to the JobManager's log file. +Users run Flink on [Kubernetes](https://kubernetes.io), [Mesos](https://mesos.apache.org/), +[Docker](https://www.docker.com/), or even as standalone services. -### How can I figure out why a program failed? +## What are the prerequisites to use Flink? -- The JobManager web frontend (by default on port 8081) displays the exceptions -of failed tasks. -- If you run the program from the command-line, task exceptions are printed to -the standard error stream and shown on the console. -- Both the command line and the web interface allow you to figure out which -parallel task first failed and caused the other tasks to cancel the execution. -- Failing tasks and the corresponding exceptions are reported in the log files -of the master and the worker where the exception occurred -(`log/flink-<user>-jobmanager-<host>.log` and -`log/flink-<user>-taskmanager-<host>.log`). + - You need *Java 8* to run Flink jobs/applications. + - The Scala API (optional) depends on Scala 2.11. + - Highly-available setups with no single point of failure require [Apache ZooKeeper](https://zookeeper.apache.org/). + - For highly-available stream processing setups that can recover from failures, Flink requires some form of distributed storage for checkpoints (HDFS / S3 / NFS / SAN / GFS / Kosmos / Ceph / ...). -### How do I debug Flink programs? +## What scale does Flink support? -- When you start a program locally with the [LocalExecutor]({{site.docs-snapshot}}/dev/local_execution.html), -you can place breakpoints in your functions and debug them like normal -Java/Scala programs. -- The [Accumulators]({{ site.docs-snapshot }}/dev/api_concepts.html#accumulators--counters) are very helpful in -tracking the behavior of the parallel execution. They allow you to gather -information inside the program's operations and show them after the program -execution. +Users are running Flink jobs both in very small setups (fewer than 5 nodes) and on 1000s of nodes and with TBs of state. -### What is the parallelism? How do I set it? +## Is Flink limited to in-memory data sets? -In Flink programs, the parallelism determines how operations are split into -individual tasks which are assigned to task slots. Each node in a cluster has at -least one task slot. The total number of task slots is the number of all task slots -on all machines. If the parallelism is set to `N`, Flink tries to divide an -operation into `N` parallel tasks which can be computed concurrently using the -available task slots. The number of task slots should be equal to the -parallelism to ensure that all tasks can be computed in a task slot concurrently. +For the DataStream API, Flink supports larger-than-memory state be configuring the RocksDB state backend. -**Note**: Not all operations can be divided into multiple tasks. For example, a -`GroupReduce` operation without a grouping has to be performed with a -parallelism of 1 because the entire group needs to be present at exactly one -node to perform the reduce operation. Flink will determine whether the -parallelism has to be 1 and set it accordingly. +For the DataSet API, all operations (except delta-iterations) can scale beyond main memory. -The parallelism can be set in numerous ways to ensure a fine-grained control -over the execution of a Flink program. See -the [Configuration guide]({{ site.docs-snapshot }}/ops/config.html#common-options) for detailed instructions on how to -set the parallelism. Also check out [this figure]({{ site.docs-snapshot }}/ops/config.html#configuring-taskmanager-processing-slots) detailing -how the processing slots and parallelism are related to each other. -## Errors +# Debugging and Common Error Messages -### Why am I getting a "NonSerializableException" ? +## I have a NotSerializableException. -All functions in Flink must be serializable, as defined by [java.io.Serializable](http://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html). -Since all function interfaces are serializable, the exception means that one -of the fields used in your function is not serializable. +Flink uses Java serialization to distribute copies of the application logic (the functions and operations you implement, +as well as the program configuration, etc.) to the parallel worker processes. +Because of that, all functions that you pass to the API must be serializable, as defined by +[java.io.Serializable](http://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html). -In particular, if your function is an inner class, or anonymous inner class, -it contains a hidden reference to the enclosing class (usually called `this$0`, if you look -at the function in the debugger). If the enclosing class is not serializable, this is probably -the source of the error. Solutions are to +If your function is an anonymous inner class, consider the following: + - make the function a standalone class, or a static inner class + - use a Java 8 lambda function. -- make the function a standalone class, or a static inner class (no more reference to the enclosing class) -- make the enclosing class serializable -- use a Java 8 lambda function. - -### In Scala API, I get an error about implicit values and evidence parameters - -It means that the implicit value for the type information could not be provided. -Make sure that you have an `import org.apache.flink.api.scala._` statement in your code. - -If you are using flink operations inside functions or classes that take -generic parameters a TypeInformation must be available for that parameter. +Is your function is already a static class, check the fields that you assign when you create +an instance of the class. One of the fields most likely holds a non-serializable type. + - In Java, use a `RichFunction` and initialize the problematic fields in the `open()` method. + - In Scala, you can often simply use âlazy valâ to defer initialization until the distributed execution happens. This may come at a minor performance cost. You can naturally also use a `RichFunction` in Scala. + +## Using the Scala API, I get an error about implicit values and evidence parameters. + +This error means that the implicit value for the type information could not be provided. +Make sure that you have an `import org.apache.flink.streaming.api.scala._` (DataStream API) or an +`import org.apache.flink.api.scala._` (DataSet API) statement in your code. + +If you are using Flink operations inside functions or classes that take +generic parameters, then a TypeInformation must be available for that parameter. This can be achieved by using a context bound: ~~~scala @@ -147,303 +120,59 @@ def myFunction[T: TypeInformation](input: DataSet[T]): DataSet[Seq[T]] = { See [Type Extraction and Serialization]({{ site.docs-snapshot }}/dev/types_serialization.html) for an in-depth discussion of how Flink handles types. -### I get an error message saying that not enough buffers are available. How do I fix this? - -If you run Flink in a massively parallel setting (100+ parallel threads), -you need to adapt the number of network buffers via the config parameter -`taskmanager.network.numberOfBuffers`. -As a rule-of-thumb, the number of buffers should be at least -`4 * numberOfTaskManagers * numberOfSlotsPerTaskManager^2`. See -[Configuration Reference]({{ site.docs-snapshot }}/ops/config.html#configuring-the-network-buffers) for details. - -### My job fails early with a java.io.EOFException. What could be the cause? - -The most common case for these exception is when Flink is set up with the -wrong HDFS version. Because different HDFS versions are often not compatible -with each other, the connection between the filesystem master and the client -breaks. - -~~~bash -Call to <host:port> failed on local exception: java.io.EOFException - at org.apache.hadoop.ipc.Client.wrapException(Client.java:775) - at org.apache.hadoop.ipc.Client.call(Client.java:743) - at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) - at $Proxy0.getProtocolVersion(Unknown Source) - at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359) - at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106) - at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207) - at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170) - at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82) - at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:276 -~~~ - -Please refer to the [download page]({{ site.baseurl }}/downloads.html) and -the {% github README.md master "build instructions" %} -for details on how to set up Flink for different Hadoop and HDFS versions. - - -### My job fails with various exceptions from the HDFS/Hadoop code. What can I do? - -Flink is shipping with the Hadoop 2.2 binaries by default. These binaries are used -to connect to HDFS or YARN. -It seems that there are some bugs in the HDFS client which cause exceptions while writing to HDFS -(in particular under high load). -Among the exceptions are the following: - -- `HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby"` -- `java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064 from datanode 172.22.5.81:50010 - at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)` - -- `Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0 - at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)` - -If you are experiencing any of these, we recommend using a Flink build with a Hadoop version matching -your local HDFS version. -You can also manually build Flink against the exact Hadoop version (for example -when using a Hadoop distribution with a custom patch level) - -### In Eclipse, I get compilation errors in the Scala projects - -Flink uses a new feature of the Scala compiler (called "quasiquotes") that have not yet been properly -integrated with the Eclipse Scala plugin. In order to make this feature available in Eclipse, you -need to manually configure the *flink-scala* project to use a *compiler plugin*: - -- Right click on *flink-scala* and choose "Properties" -- Select "Scala Compiler" and click on the "Advanced" tab. (If you do not have that, you probably have not set up Eclipse for Scala properly.) -- Check the box "Use Project Settings" -- In the field "Xplugin", put the path "/home/<user-name>/.m2/repository/org/scalamacros/paradise_2.10.4/2.0.1/paradise_2.10.4-2.0.1.jar" -- NOTE: You have to build Flink with Maven on the command line first, to make sure the plugin is downloaded. - -### My program does not compute the correct result. Why are my custom key types not grouped/joined correctly? - -Keys must correctly implement the methods `java.lang.Object#hashCode()`, -`java.lang.Object#equals(Object o)`, and `java.util.Comparable#compareTo(...)`. -These methods are always backed with default implementations which are usually -inadequate. Therefore, all keys must override `hashCode()` and `equals(Object o)`. - -### I get a java.lang.InstantiationException for my data type, what is wrong? - -All data type classes must be public and have a public nullary constructor -(constructor with no arguments). Further more, the classes must not be abstract -or interfaces. If the classes are internal classes, they must be public and -static. - -### I can't stop Flink with the provided stop-scripts. What can I do? - -Stopping the processes sometimes takes a few seconds, because the shutdown may -do some cleanup work. - -In some error cases it happens that the JobManager or TaskManager cannot be -stopped with the provided stop-scripts (`bin/stop-local.sh` or `bin/stop- -cluster.sh`). You can kill their processes on Linux/Mac as follows: - -- Determine the process id (pid) of the JobManager / TaskManager process. You -can use the `jps` command on Linux(if you have OpenJDK installed) or command -`ps -ef | grep java` to find all Java processes. -- Kill the process with `kill -9 <pid>`, where `pid` is the process id of the -affected JobManager or TaskManager process. - -On Windows, the TaskManager shows a table of all processes and allows you to -destroy a process by right its entry. - -Both the JobManager and TaskManager services will write signals (like SIGKILL -and SIGTERM) into their respective log files. This can be helpful for -debugging issues with the stopping behavior. - -### I got an OutOfMemoryException. What can I do? - -These exceptions occur usually when the functions in the program consume a lot -of memory by collection large numbers of objects, for example in lists or maps. -The OutOfMemoryExceptions in Java are kind of tricky. The exception is not -necessarily thrown by the component that allocated most of the memory but by the -component that tried to requested the latest bit of memory that could not be -provided. - -There are two ways to go about this: - -1. See whether you can use less memory inside the functions. For example, use -arrays of primitive types instead of object types. - -2. Reduce the memory that Flink reserves for its own processing. The -TaskManager reserves a certain portion of the available memory for sorting, -hashing, caching, network buffering, etc. That part of the memory is unavailable -to the user-defined functions. By reserving it, the system can guarantee to not -run out of memory on large inputs, but to plan with the available memory and -destage operations to disk, if necessary. By default, the system reserves around -70% of the memory. If you frequently run applications that need more memory in -the user-defined functions, you can reduce that value using the configuration -entries `taskmanager.memory.fraction` or `taskmanager.memory.size`. See the -[Configuration Reference]({{ site.docs-snapshot }}/ops/config.html) for details. This will leave more memory to JVM heap, -but may cause data processing tasks to go to disk more often. +## I see a ClassCastException: X cannot be cast to X. -Another reason for OutOfMemoryExceptions is the use of the wrong state backend. -By default, Flink is using a heap-based state backend for operator state in -streaming jobs. The `RocksDBStateBackend` allows state sizes larger than the -available heap space. +When you see an exception in the style `com.foo.X` cannot be cast to `com.foo.X` (or cannot be assigned to `com.foo.X`), it means that +multiple versions of the class `com.foo.X` have been loaded by different class loaders, and types of that class are attempted to be assigned to each other. -### Why do the TaskManager log files become so huge? +The reason for that can be: -Check the logging behavior of your jobs. Emitting logging per object or tuple may be -helpful to debug jobs in small setups with tiny data sets but can limit performance -and consume substantial disk space if used for large input data. - -### The slot allocated for my task manager has been released. What should I do? - -If you see a `java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager` even though the TaskManager did actually not crash, it -means that the TaskManager was unresponsive for a time. That can be due to network issues, but is frequently due to long garbage collection stalls. -In this case, a quick fix would be to use an incremental Garbage Collector, like the G1 garbage collector. It usually leads to shorter pauses. Furthermore, you can dedicate more memory to -the user code by reducing the amount of memory Flink grabs for its internal operations (see configuration of TaskManager managed memory). - -If both of these approaches fail and the error persists, simply increase the TaskManager's heartbeat pause by setting AKKA_WATCH_HEARTBEAT_PAUSE (akka.watch.heartbeat.pause) to a greater value (e.g. 600s). -This will cause the JobManager to wait for a heartbeat for a longer time interval before considering the TaskManager lost. - -## YARN Deployment - -### The YARN session runs only for a few seconds - -The `./bin/yarn-session.sh` script is intended to run while the YARN-session is -open. In some error cases however, the script immediately stops running. The -output looks like this: - -~~~ -07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host -Flink JobManager is now running on worker1:6123 -JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/ -07:34:51,528 INFO org.apache.flinkyarn.Client - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553 -07:34:51,529 INFO org.apache.flinkyarn.Client - Killing the Flink-YARN application. -07:34:51,529 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killing application application_1295604279745_273123 -07:34:51,534 INFO org.apache.flinkyarn.Client - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123 -07:34:51,559 INFO org.apache.flinkyarn.Client - YARN Client is shutting down -~~~ - -The problem here is that the Application Master (AM) is stopping and the YARN client assumes that the application has finished. - -There are three possible reasons for that behavior: - -- The ApplicationMaster exited with an exception. To debug that error, have a -look in the logfiles of the container. The `yarn-site.xml` file contains the -configured path. The key for the path is `yarn.nodemanager.log-dirs`, the -default value is `${yarn.log.dir}/userlogs`. - -- YARN has killed the container that runs the ApplicationMaster. This case -happens when the AM used too much memory or other resources beyond YARN's -limits. In this case, you'll find error messages in the nodemanager logs on -the host. - -- The operating system has shut down the JVM of the AM. This can happen if the -YARN configuration is wrong and more memory than physically available is -configured. Execute `dmesg` on the machine where the AM was running to see if -this happened. You see messages from Linux' [OOM killer](http://linux-mm.org/OOM_Killer). - -### My YARN containers are killed because they use too much memory - -This is usually indicated my a log message like the following one: - -~~~ -Container container_e05_1467433388200_0136_01_000002 is completed with diagnostics: Container [pid=5832,containerID=container_e05_1467433388200_0136_01_000002] is running beyond physical memory limits. Current usage: 2.3 GB of 2 GB physical memory used; 6.1 GB of 4.2 GB virtual memory used. Killing container. -~~~ - -In that case, the JVM process grew too large. Because the Java heap size is always limited, the extra memory typically comes from non-heap sources: - - - Libraries that use off-heap memory. (Flink's own off-heap memory is limited and taken into account when calculating the allowed heap size.) - - PermGen space (strings and classes), code caches, memory mapped jar files - - Native libraries (RocksDB) - -You can activate the [memory debug logger]({{ site.docs-snapshot }}/ops/config.html#memory-and-performance-debugging) to get more insight into what memory pool is actually using up too much memory. - - -### The YARN session crashes with a HDFS permission exception during startup - -While starting the YARN session, you are receiving an exception like this: - -~~~ -Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x - at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234) - at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214) - at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5193) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5175) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5149) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2090) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) - at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) - at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) - at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) - at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) - at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) - at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) - at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053) - at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) - at java.security.AccessController.doPrivileged(Native Method) - at javax.security.auth.Subject.doAs(Subject.java:396) - at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) - at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047) - - at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) - at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) - at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) - at java.lang.reflect.Constructor.newInstance(Constructor.java:513) - at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) - at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) - at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1393) - at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1382) - at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1307) - at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:384) - at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:380) - at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) - at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:380) - at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:324) - at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) - at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886) - at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783) - at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365) - at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338) - at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2021) - at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1989) - at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1954) - at org.apache.flinkyarn.Utils.setupLocalResource(Utils.java:176) - at org.apache.flinkyarn.Client.run(Client.java:362) - at org.apache.flinkyarn.Client.main(Client.java:568) -~~~ + - Class duplication through `child-first` classloading. That is an intended mechanism to allow users to use different versions of the same + dependencies that Flink uses. However, if different copies of these classes move between Flink's core and the user application code, such an exception + can occur. To verify that this is the reason, try setting `classloader.resolve-order: parent-first` in the configuration. + If that makes the error disappear, please write to the mailing list to check if that may be a bug. -The reason for this error is, that the home directory of the user **in HDFS** -has the wrong permissions. The user (in this case `robert`) can not create -directories in his own home directory. + - Caching of classes from different execution attempts, for example by utilities like Guavaâs Interners, or Avro's Schema cache. + Try to not use interners, or reduce the scope of the interner/cache to make sure a new cache is created whenever a new task + execution is started. -Flink creates a `.flink/` directory in the users home directory -where it stores the Flink jar and configuration file. +## I have an AbstractMethodError or NoSuchFieldError. +Such errors typically indicate a mix-up in some dependency version. That means a different version of a dependency (a library) +is loaded during the execution compared to the version that code was compiled against. -### My job is not reacting to a job cancellation? +From Flink 1.4.0 on, dependencies in your application JAR file may have different versions compared to dependencies used +by Flink's core, or other dependencies in the classpath (for example from Hadoop). That requires `child-first` classloading +to be activated, which is the default. -Flink is canceling a job by calling the `cancel()` method on all user tasks. Ideally, -the tasks properly react to the call and stop what they are currently doing, so that -all threads can shut down. +If you see these problems in Flink 1.4+, one of the following may be true: + - You have a dependency version conflict within your application code. Make sure all your dependency versions are consistent. + - You are conflicting with a library that Flink cannot support via `child-first` classloading. Currently these are the + Scala standard library classes, as well as Flink's own classes, logging APIs, and any Hadoop core classes. -If the tasks are not reacting for a certain amount of time, Flink will start interrupting -the thread periodically. -The TaskManager logs will also contain the current stack of the method where the user -code is blocked. +## My DataStream application produces no output, even though events are going in. +If your DataStream application uses *Event Time*, check that your watermarks get updated. If no watermarks are produced, +event time windows might never trigger, and the application would produce no results. -## Features +You can check in Flink's web UI (watermarks section) whether watermarks are making progress. -### What kind of fault-tolerance does Flink provide? +## I see an exception reporting "Insufficient number of network buffers". -For streaming programs Flink has a novel approach to draw periodic snapshots of the streaming dataflow state and use those for recovery. -This mechanism is both efficient and flexible. See the documentation on [streaming fault tolerance]({{ site.docs-snapshot }}/internals/stream_checkpointing.html) for details. +If you run Flink with a very high parallelism, you may need to increase the number of network buffers. -For batch processing programs Flink remembers the program's sequence of transformations and can restart failed jobs. +By default, Flink takes 10% of the JVM heap size for network buffers, with a minimum of 64MB and a maximum of 1GB. +You can adjust all these values via `taskmanager.network.memory.fraction`, `taskmanager.network.memory.min`, and +`taskmanager.network.memory.max`. +Please refer to the [Configuration Reference]({{ site.docs-snapshot }}/ops/config.html#configuring-the-network-buffers) for details. -### Are Hadoop-like utilities, such as Counters and the DistributedCache supported? +## My job fails with various exceptions from the HDFS/Hadoop code. What can I do? -[Flink's Accumulators]({{ site.docs-snapshot }}/dev/api_concepts.html#accumulators--counters) work very similar like -Hadoop's counters, but are more powerful. +The most common cause for that is that the Hadoop version in Flink's classpath is different than the +Hadoop version of the cluster you want to connect to (HDFS / YARN). -Flink has a [Distributed Cache](https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java) that is deeply integrated with the APIs. Please refer to the [JavaDocs](https://github.com/apache/flink/tree/master/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L831) for details on how to use it. +The easiest way to fix that is to pick a Hadoop-free Flink version and simply export the Hadoop path and +classpath from the cluster. -In order to make data sets available on all tasks, we encourage you to use [Broadcast Variables]({{ site.docs-snapshot }}/dev/batch/index.html#broadcast-variables) instead. They are more efficient and easier to use than the distributed cache.
