Repository: flink
Updated Branches:
  refs/heads/master bce355093 -> ba62df14a


[FLINK-3641] Add documentation for DataSet distributed cache.

This closes #2122


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba62df14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba62df14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba62df14

Branch: refs/heads/master
Commit: ba62df14a52660cec85783b1070821acd144fd06
Parents: 5a0c268
Author: Fabian Hueske <[email protected]>
Authored: Wed Jun 15 14:19:43 2016 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/index.md | 106 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 105 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba62df14/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
index 993fb72..42c6c92 100644
--- a/docs/apis/batch/index.md
+++ b/docs/apis/batch/index.md
@@ -2019,6 +2019,110 @@ of a function, or use the `withParameters(...)` method 
to pass in a configuratio
 
 {% top %}
 
+Distributed Cache
+-------------------
+
+Flink offers a distributed cache, similar to Apache Hadoop, to make files 
locally accessible to parallel instances of user functions. This functionality 
can be used to share files that contain static external data such as 
dictionaries or machine-learned regression models.
+
+The cache works as follows. A program registers a file or directory of a 
[local or remote filesystem such as HDFS or S3]({{ site.baseurl 
}}/apis/batch/connectors.html#reading-from-file-systems) under a specific name 
in its `ExecutionEnvironment` as a cached file. When the program is executed, 
Flink automatically copies the file or directory to the local filesystem of all 
workers. A user function can look up the file or directory under the specified 
name and access it from the worker's local filesystem. 
+
+The distributed cache is used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Register the file or directory in the `ExecutionEnvironment`.
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// register a file from HDFS
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+
+// register a local executable file (script, executable, ...)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+
+// define your program and execute
+...
+DataSet<String> input = ...
+DataSet<Integer> result = input.map(new MyMapper());
+...
+env.execute();
+{% endhighlight %}
+
+Access the cached file or directory in a user function (here a `MapFunction`). 
The function must extend a [RichFunction]({{ site.baseurl 
}}/apis/common/index.html#rich-functions) class because it needs access to the 
`RuntimeContext`.
+
+{% highlight java %}
+
+// extend a RichFunction to have access to the RuntimeContext
+public final class MyMapper extends RichMapFunction<String, Integer> {
+    
+    @Override
+    public void open(Configuration config) {
+      
+      // access cached file via RuntimeContext and DistributedCache
+      File myFile = 
getRuntimeContext().getDistributedCache().getFile("hdfsFile");
+      // read the file (or navigate the directory)
+      ...
+    }
+
+    @Override
+    public Integer map(String value) throws Exception {
+      // use content of cached file
+      ...
+    }
+}
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Register the file or directory in the `ExecutionEnvironment`.
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// register a file from HDFS
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+
+// register a local executable file (script, executable, ...)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+
+// define your program and execute
+...
+val input: DataSet[String] = ...
+val result: DataSet[Integer] = input.map(new MyMapper())
+...
+env.execute()
+{% endhighlight %}
+
+Access the cached file in a user function (here a `MapFunction`). The function 
must extend a [RichFunction]({{ site.baseurl 
}}/apis/common/index.html#rich-functions) class because it needs access to the 
`RuntimeContext`.
+
+{% highlight scala %}
+
+// extend a RichFunction to have access to the RuntimeContext
+class MyMapper extends RichMapFunction[String, Int] {
+
+  override def open(config: Configuration): Unit = {
+
+    // access cached file via RuntimeContext and DistributedCache
+    val myFile: File = 
getRuntimeContext.getDistributedCache.getFile("hdfsFile")
+    // read the file (or navigate the directory)
+    ...
+  }
+
+  override def map(value: String): Int = {
+    // use content of cached file
+    ...
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+{% top %}
+
 Passing Parameters to Functions
 -------------------
 
@@ -2067,7 +2171,7 @@ class MyFilter(limit: Int) extends FilterFunction[Int] {
 
 #### Via `withParameters(Configuration)`
 
-This method takes a Configuration object as an argument, which will be passed 
to the [rich function](#rich-functions)'s `open()`
+This method takes a Configuration object as an argument, which will be passed 
to the [rich function]({{ site.baseurl 
}}/apis/common/index.html#rich-functions)'s `open()`
 method. The Configuration object is a Map from String keys to different value 
types.
 
 <div class="codetabs" markdown="1">

Reply via email to