http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/metrics.md ---------------------------------------------------------------------- diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md deleted file mode 100644 index 1cc7a29..0000000 --- a/docs/apis/metrics.md +++ /dev/null @@ -1,470 +0,0 @@ ---- -title: "Metrics" -# Top-level navigation -top-nav-group: apis -top-nav-pos: 13 -top-nav-title: "Metrics" ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -Flink exposes a metric system that allows gathering and exposing metrics to external systems. - -* This will be replaced by the TOC -{:toc} - -## Registering metrics - -You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`. -This method returns a `MetricGroup` object on which you can create and register new metrics. - -### Metric types - -Flink supports `Counters`, `Gauges` and `Histograms`. - -#### Counter - -A `Counter` is used to count something. The current value can be in- or decremented using `inc()/inc(long n)` or `dec()/dec(long n)`. -You can create and register a `Counter` by calling `counter(String name)` on a `MetricGroup`. - -{% highlight java %} - -public class MyMapper extends RichMapFunction<String, Integer> { - private Counter counter; - - @Override - public void open(Configuration config) { - this.counter = getRuntimeContext() - .getMetricGroup() - .counter("myCounter"); - } - - @public Integer map(String value) throws Exception { - this.counter.inc(); - } -} - -{% endhighlight %} - -Alternatively you can also use your own `Counter` implementation: - -{% highlight java %} - -public class MyMapper extends RichMapFunction<String, Integer> { - private Counter counter; - - @Override - public void open(Configuration config) { - this.counter = getRuntimeContext() - .getMetricGroup() - .counter("myCustomCounter", new CustomCounter()); - } -} - -{% endhighlight %} - -#### Gauge - -A `Gauge` provides a value of any type on demand. In order to use a `Gauge` you must first create a class that implements the `org.apache.flink.metrics.Gauge` interface. -There is no restriction for the type of the returned value. -You can register a gauge by calling `gauge(String name, Gauge gauge)` on a `MetricGroup`. - -{% highlight java %} - -public class MyMapper extends RichMapFunction<String, Integer> { - private int valueToExpose; - - @Override - public void open(Configuration config) { - getRuntimeContext() - .getMetricGroup() - .gauge("MyGauge", new Gauge<Integer>() { - @Override - public Integer getValue() { - return valueToExpose; - } - }); - } -} - -{% endhighlight %} - -Note that reporters will turn the exposed object into a `String`, which means that a meaningful `toString()` implementation is required. - -#### Histogram - -A `Histogram` measures the distribution of long values. -You can register one by calling `histogram(String name, Histogram histogram)` on a `MetricGroup`. - -{% highlight java %} -public class MyMapper extends RichMapFunction<Long, Integer> { - private Histogram histogram; - - @Override - public void open(Configuration config) { - this.histogram = getRuntimeContext() - .getMetricGroup() - .histogram("myHistogram", new MyHistogram()); - } - - @public Integer map(Long value) throws Exception { - this.histogram.update(value); - } -} -{% endhighlight %} - -Flink does not provide a default implementation for `Histogram`, but offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard histograms. -To use this wrapper add the following dependency in your `pom.xml`: -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-dropwizard</artifactId> - <version>{{site.version}}</version> -</dependency> -{% endhighlight %} - -You can then register a Codahale/DropWizard histogram like this: - -{% highlight java %} -public class MyMapper extends RichMapFunction<Long, Integer> { - private Histogram histogram; - - @Override - public void open(Configuration config) { - com.codahale.metrics.Histogram histogram = - new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); - - this.histogram = getRuntimeContext() - .getMetricGroup() - .histogram("myHistogram", new DropWizardHistogramWrapper(histogram)); - } -} -{% endhighlight %} - -## Scope - -Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. -For example, if `A.B` is the sytem scope, `C.D` the user scope and `E` the name, then the identifier for the metric will be `A.B.C.D.E`. - -You can configure which delimiter to use for the identifier (default: `.`) by setting the `metrics.scope.delimiter` key in `conf/flink-conf.yaml`. - -### User Scope - -You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`. - -{% highlight java %} - -counter = getRuntimeContext() - .getMetricGroup() - .addGroup("MyMetrics") - .counter("myCounter"); - -{% endhighlight %} - -### System Scope - -The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to. - -Which context information should be included can be configured by setting the following keys in `conf/flink-conf.yaml`. -Each of these keys expect a format string that may contain constants (e.g. "taskmanager") and variables (e.g. "<task_id>") which will be replaced at runtime. - -- `metrics.scope.jm` - - Default: <host>.jobmanager - - Applied to all metrics that were scoped to a job manager. -- `metrics.scope.jm.job` - - Default: <host>.jobmanager.<job_name> - - Applied to all metrics that were scoped to a job manager and job. -- `metrics.scope.tm` - - Default: <host>.taskmanager.<tm_id> - - Applied to all metrics that were scoped to a task manager. -- `metrics.scope.tm.job` - - Default: <host>.taskmanager.<tm_id>.<job_name> - - Applied to all metrics that were scoped to a task manager and job. -- `metrics.scope.task` - - Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index> - - Applied to all metrics that were scoped to a task. -- `metrics.scope.operator` - - Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index> - - Applied to all metrics that were scoped to an operator. - -There are no restrictions on the number or order of variables. Variables are case sensitive. - -The default scope for operator metrics will result in an identifier akin to `localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric` - -If you also want to include the task name but omit the task manager information you can specify the following format: - -`metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>` - -This could create the identifier `localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric`. - -Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. -As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) -or by assigning unique names to jobs and operators. - -### List of all Variables - -- JobManager: <host> -- TaskManager: <host>, <tm_id> -- Job: <job_id>, <job_name> -- Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index> -- Operator: <operator_name>, <subtask_index> - -## Reporter - -Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. - -- `metrics.reporters`: The list of named reporters. -- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`. -- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`. -- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`. - -All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below, -we will list more settings specific to each reporter. - -Example reporter configuration that specifies multiple reporters: - -``` -metrics.reporters: my_jmx_reporter,my_other_reporter - -metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter -metrics.reporter.my_jmx_reporter.port: 9020-9040 - -metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter -metrics.reporter.my_other_reporter.host: 192.168.1.1 -metrics.reporter.my_other_reporter.port: 10000 - -``` - -You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface. -If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well. - -The following sections list the supported reporters. - -### JMX (org.apache.flink.metrics.jmx.JMXReporter) - -You don't have to include an additional dependency since the JMX reporter is available by default -but not activated. - -Parameters: - -- `port` - the port on which JMX listens for connections. This can also be a port range. When a -range is specified the actual port is shown in the relevant job or task manager log. If you don't -specify a port no extra JMX server will be started. Metrics are still available on the default -local JMX interface. - -### Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter) -Dependency: -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-ganglia</artifactId> - <version>{{site.version}}</version> -</dependency> -{% endhighlight %} - -Parameters: - -- `host` - the gmond host address configured under `udp_recv_channel.bind` in `gmond.conf` -- `port` - the gmond port configured under `udp_recv_channel.port` in `gmond.conf` -- `tmax` - soft limit for how long an old metric should be retained -- `dmax` - hard limit for how long an old metric should be retained -- `ttl` - time-to-live for transmitted UDP packets -- `addressingMode` - UDP addressing mode to use (UNICAST/MULTICAST) - -### Graphite (org.apache.flink.metrics.graphite.GraphiteReporter) -Dependency: -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-graphite</artifactId> - <version>{{site.version}}</version> -</dependency> -{% endhighlight %} - -Parameters: - -- `host` - the Graphite server host -- `port` - the Graphite server port - -### StatsD (org.apache.flink.metrics.statsd.StatsDReporter) -Dependency: -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-statsd</artifactId> - <version>{{site.version}}</version> -</dependency> -{% endhighlight %} - -Parameters: - -- `host` - the StatsD server host -- `port` - the StatsD server port - -## System metrics - -Flink exposes the following system metrics: - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Scope</th> - <th class="text-left">Metrics</th> - <th class="text-left">Description</th> - </tr> - </thead> - - <tbody> - <tr> - <th rowspan="1"><strong>JobManager</strong></th> - <td></td> - <td></td> - </tr> - <tr> - <th rowspan="19"><strong>TaskManager.Status.JVM</strong></th> - <td>ClassLoader.ClassesLoaded</td> - <td>The total number of classes loaded since the start of the JVM.</td> - </tr> - <tr> - <td>ClassLoader.ClassesUnloaded</td> - <td>The total number of classes unloaded since the start of the JVM.</td> - </tr> - <tr> - <td>GargabeCollector.<garbageCollector>.Count</td> - <td>The total number of collections that have occurred.</td> - </tr> - <tr> - <td>GargabeCollector.<garbageCollector>.Time</td> - <td>The total time spent performing garbage collection.</td> - </tr> - <tr> - <td>Memory.Heap.Used</td> - <td>The amount of heap memory currently used.</td> - </tr> - <tr> - <td>Memory.Heap.Committed</td> - <td>The amount of heap memory guaranteed to be available to the JVM.</td> - </tr> - <tr> - <td>Memory.Heap.Max</td> - <td>The maximum amount of heap memory that can be used for memory management.</td> - </tr> - <tr> - <td>Memory.NonHeap.Used</td> - <td>The amount of non-heap memory currently used.</td> - </tr> - <tr> - <td>Memory.NonHeap.Committed</td> - <td>The amount of non-heap memory guaranteed to be available to the JVM.</td> - </tr> - <tr> - <td>Memory.NonHeap.Max</td> - <td>The maximum amount of non-heap memory that can be used for memory management.</td> - </tr> - <tr> - <td>Memory.Direct.Count</td> - <td>The number of buffers in the direct buffer pool.</td> - </tr> - <tr> - <td>Memory.Direct.MemoryUsed</td> - <td>The amount of memory used by the JVM for the direct buffer pool.</td> - </tr> - <tr> - <td>Memory.Direct.TotalCapacity</td> - <td>The total capacity of all buffers in the direct buffer pool.</td> - </tr> - <tr> - <td>Memory.Mapped.Count</td> - <td>The number of buffers in the mapped buffer pool.</td> - </tr> - <tr> - <td>Memory.Mapped.MemoryUsed</td> - <td>The amount of memory used by the JVM for the mapped buffer pool.</td> - </tr> - <tr> - <td>Memory.Mapped.TotalCapacity</td> - <td>The number of buffers in the mapped buffer pool.</td> - </tr> - <tr> - <td>Threads.Count</td> - <td>The total number of live threads.</td> - </tr> - <tr> - <td>CPU.Load</td> - <td>The recent CPU usage of the JVM.</td> - </tr> - <tr> - <td>CPU.Time</td> - <td>The CPU time used by the JVM.</td> - </tr> - <tr> - <th rowspan="1"><strong>Job</strong></th> - <td></td> - <td></td> - </tr> - <tr> - <tr> - <th rowspan="7"><strong>Task</strong></t> - <td>currentLowWatermark</td> - <td>The lowest watermark a task has received.</td> - </tr> - <tr> - <td>lastCheckpointDuration</td> - <td>The time it took to complete the last checkpoint.</td> - </tr> - <tr> - <td>lastCheckpointSize</td> - <td>The total size of the last checkpoint.</td> - </tr> - <tr> - <td>restartingTime</td> - <td>The time it took to restart the job.</td> - </tr> - <tr> - <td>numBytesInLocal</td> - <td>The total number of bytes this task has read from a local source.</td> - </tr> - <tr> - <td>numBytesInRemote</td> - <td>The total number of bytes this task has read from a remote source.</td> - </tr> - <tr> - <td>numBytesOut</td> - <td>The total number of bytes this task has emitted.</td> - </tr> - </tr> - <tr> - <tr> - <th rowspan="3"><strong>Operator</strong></th> - <td>numRecordsIn</td> - <td>The total number of records this operator has received.</td> - </tr> - <tr> - <td>numRecordsOut</td> - <td>The total number of records this operator has emitted.</td> - </tr> - <tr> - <td>numSplitsProcessed</td> - <td>The total number of InputSplits this data source has processed.</td> - </tr> - </tr> - </tbody> -</table> - -{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md deleted file mode 100644 index 0d865fe..0000000 --- a/docs/apis/programming_guide.md +++ /dev/null @@ -1,26 +0,0 @@ ---- -title: DataSet API ---- - -<meta http-equiv="refresh" content="1; url={{ site.baseurl }}/apis/batch/index.html" /> - -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -The *DataSet API guide* has been moved. Redirecting to [{{ site.baseurl }}/apis/batch/index.html]({{ site.baseurl }}/apis/batch/index.html) in 1 second. http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/scala_api_extensions.md ---------------------------------------------------------------------- diff --git a/docs/apis/scala_api_extensions.md b/docs/apis/scala_api_extensions.md deleted file mode 100644 index e3268bf..0000000 --- a/docs/apis/scala_api_extensions.md +++ /dev/null @@ -1,409 +0,0 @@ ---- -title: "Scala API Extensions" -# Top-level navigation -top-nav-group: apis -top-nav-pos: 11 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -In order to keep a fair amount of consistency between the Scala and Java APIs, some -of the features that allow a high-level of expressiveness in Scala have been left -out from the standard APIs for both batch and streaming. - -If you want to _enjoy the full Scala experience_ you can choose to opt-in to -extensions that enhance the Scala API via implicit conversions. - -To use all the available extensions, you can just add a simple `import` for the -DataSet API - -{% highlight scala %} -import org.apache.flink.api.scala.extensions._ -{% endhighlight %} - -or the DataStream API - -{% highlight scala %} -import org.apache.flink.streaming.api.scala.extensions._ -{% endhighlight %} - -Alternatively, you can import individual extensions _a-là -carte_ to only use those -you prefer. - -## Accept partial functions - -Normally, both the DataSet and DataStream APIs don't accept anonymous pattern -matching functions to deconstruct tuples, case classes or collections, like the -following: - -{% highlight scala %} -val data: DataSet[(Int, String, Double)] = // [...] -data.map { - case (id, name, temperature) => // [...] - // The previous line causes the following compilation error: - // "The argument types of an anonymous function must be fully known. (SLS 8.5)" -} -{% endhighlight %} - -This extension introduces new methods in both the DataSet and DataStream Scala API -that have a one-to-one correspondance in the extended API. These delegating methods -do support anonymous pattern matching functions. - -#### DataSet API - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Method</th> - <th class="text-left" style="width: 20%">Original</th> - <th class="text-center">Example</th> - </tr> - </thead> - - <tbody> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (DataSet)</strong></td> - <td> -{% highlight scala %} -data.mapWith { - case (_, value) => value.toString -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>mapPartitionWith</strong></td> - <td><strong>mapPartition (DataSet)</strong></td> - <td> -{% highlight scala %} -data.mapPartitionWith { - case head #:: _ => head -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (DataSet)</strong></td> - <td> -{% highlight scala %} -data.flatMapWith { - case (_, name, visitTimes) => visitTimes.map(name -> _) -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>filterWith</strong></td> - <td><strong>filter (DataSet)</strong></td> - <td> -{% highlight scala %} -data.filterWith { - case Train(_, isOnTime) => isOnTime -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>reduceWith</strong></td> - <td><strong>reduce (DataSet, GroupedDataSet)</strong></td> - <td> -{% highlight scala %} -data.reduceWith { - case ((_, amount1), (_, amount2)) => amount1 + amount2 -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>reduceGroupWith</strong></td> - <td><strong>reduceGroup (GroupedDataSet)</strong></td> - <td> -{% highlight scala %} -data.reduceGroupWith { - case id #:: value #:: _ => id -> value -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>groupingBy</strong></td> - <td><strong>groupBy (DataSet)</strong></td> - <td> -{% highlight scala %} -data.groupingBy { - case (id, _, _) => id -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>sortGroupWith</strong></td> - <td><strong>sortGroup (GroupedDataSet)</strong></td> - <td> -{% highlight scala %} -grouped.sortGroupWith(Order.ASCENDING) { - case House(_, value) => value -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>combineGroupWith</strong></td> - <td><strong>combineGroup (GroupedDataSet)</strong></td> - <td> -{% highlight scala %} -grouped.combineGroupWith { - case header #:: amounts => amounts.sum -} -{% endhighlight %} - </td> - <tr> - <td><strong>projecting</strong></td> - <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td> - <td> -{% highlight scala %} -data1.join(data2). - whereClause(case (pk, _) => pk). - isEqualTo(case (_, fk) => fk). - projecting { - case ((pk, tx), (products, fk)) => tx -> products - } - -data1.cross(data2).projecting { - case ((a, _), (_, b) => a -> b -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>projecting</strong></td> - <td><strong>apply (CoGroupDataSet)</strong></td> - <td> -{% highlight scala %} -data1.coGroup(data2). - whereClause(case (pk, _) => pk). - isEqualTo(case (_, fk) => fk). - projecting { - case (head1 #:: _, head2 #:: _) => head1 -> head2 - } -} -{% endhighlight %} - </td> - </tr> - </tr> - </tbody> -</table> - -#### DataStream API - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Method</th> - <th class="text-left" style="width: 20%">Original</th> - <th class="text-center">Example</th> - </tr> - </thead> - - <tbody> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (DataStream)</strong></td> - <td> -{% highlight scala %} -data.mapWith { - case (_, value) => value.toString -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>mapPartitionWith</strong></td> - <td><strong>mapPartition (DataStream)</strong></td> - <td> -{% highlight scala %} -data.mapPartitionWith { - case head #:: _ => head -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (DataStream)</strong></td> - <td> -{% highlight scala %} -data.flatMapWith { - case (_, name, visits) => visits.map(name -> _) -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>filterWith</strong></td> - <td><strong>filter (DataStream)</strong></td> - <td> -{% highlight scala %} -data.filterWith { - case Train(_, isOnTime) => isOnTime -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>keyingBy</strong></td> - <td><strong>keyBy (DataStream)</strong></td> - <td> -{% highlight scala %} -data.keyingBy { - case (id, _, _) => id -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (ConnectedDataStream)</strong></td> - <td> -{% highlight scala %} -data.mapWith( - map1 = case (_, value) => value.toString, - map2 = case (_, _, value, _) => value + 1 -) -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (ConnectedDataStream)</strong></td> - <td> -{% highlight scala %} -data.flatMapWith( - flatMap1 = case (_, json) => parse(json), - flatMap2 = case (_, _, json, _) => parse(json) -) -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>keyingBy</strong></td> - <td><strong>keyBy (ConnectedDataStream)</strong></td> - <td> -{% highlight scala %} -data.keyingBy( - key1 = case (_, timestamp) => timestamp, - key2 = case (id, _, _) => id -) -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>reduceWith</strong></td> - <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td> - <td> -{% highlight scala %} -data.reduceWith { - case ((_, sum1), (_, sum2) => sum1 + sum2 -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>foldWith</strong></td> - <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td> - <td> -{% highlight scala %} -data.foldWith(User(bought = 0)) { - case (User(b), (_, items)) => User(b + items.size) -} -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>applyWith</strong></td> - <td><strong>apply (WindowedDataStream)</strong></td> - <td> -{% highlight scala %} -data.applyWith(0)( - foldFunction = case (sum, amount) => sum + amount - windowFunction = case (k, w, sum) => // [...] -) -{% endhighlight %} - </td> - </tr> - <tr> - <td><strong>projecting</strong></td> - <td><strong>apply (JoinedDataStream)</strong></td> - <td> -{% highlight scala %} -data1.join(data2). - whereClause(case (pk, _) => pk). - isEqualTo(case (_, fk) => fk). - projecting { - case ((pk, tx), (products, fk)) => tx -> products - } -{% endhighlight %} - </td> - </tr> - </tbody> -</table> - - - -For more information on the semantics of each method, please refer to the -[DataStream](batch/index.html) and [DataSet](streaming/index.html) API documentation. - -To use this extension exclusively, you can add the following `import`: - -{% highlight scala %} -import org.apache.flink.api.scala.extensions.acceptPartialFunctions -{% endhighlight %} - -for the DataSet extensions and - -{% highlight scala %} -import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions -{% endhighlight %} - -The following snippet shows a minimal example of how to use these extension -methods together (with the DataSet API): - -{% highlight scala %} -object Main { - import org.apache.flink.api.scala.extensions._ - case class Point(x: Double, y: Double) - def main(args: Array[String]): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6)) - ds.filterWith { - case Point(x, _) => x > 1 - }.reduceWith { - case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2) - }.mapWith { - case Point(x, y) => (x, y) - }.flatMapWith { - case (x, y) => Seq("x" -> x, "y" -> y) - }.groupingBy { - case (id, value) => id - } - } -} -{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/scala_shell.md ---------------------------------------------------------------------- diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md deleted file mode 100644 index ad36ca0..0000000 --- a/docs/apis/scala_shell.md +++ /dev/null @@ -1,197 +0,0 @@ ---- -title: "Scala Shell" -# Top-level navigation -top-nav-group: apis -top-nav-pos: 10 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - - -Flink comes with an integrated interactive Scala Shell. -It can be used in a local setup as well as in a cluster setup. - - -To use the shell with an integrated Flink cluster just execute: - -~~~bash -bin/start-scala-shell.sh local -~~~ - -in the root directory of your binary Flink directory. To run the Shell on a -cluster, please see the Setup section below. - - -## Usage - -The shell supports Batch and Streaming. -Two different ExecutionEnvironments are automatically prebound after startup. -Use "benv" and "senv" to access the Batch and Streaming environment respectively. - -### DataSet API - -The following example will execute the wordcount program in the Scala shell: - -~~~scala -Scala-Flink> val text = benv.fromElements( - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,") -Scala-Flink> val counts = text - .flatMap { _.toLowerCase.split("\\W+") } - .map { (_, 1) }.groupBy(0).sum(1) -Scala-Flink> counts.print() -~~~ - -The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal. - -It is possible to write results to a file. However, in this case you need to call `execute`, to run your program: - -~~~scala -Scala-Flink> benv.execute("MyProgram") -~~~ - -### DataStream API - -Similar to the the batch program above, we can execute a streaming program through the DataStream API: - -~~~scala -Scala-Flink> val textStreaming = senv.fromElements( - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,") -Scala-Flink> val countsStreaming = textStreaming - .flatMap { _.toLowerCase.split("\\W+") } - .map { (_, 1) }.keyBy(0).sum(1) -Scala-Flink> countsStreaming.print() -Scala-Flink> senv.execute("Streaming Wordcount") -~~~ - -Note, that in the Streaming case, the print operation does not trigger execution directly. - -The Flink Shell comes with command history and auto-completion. - - -## Adding external dependencies - -It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute. - -Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes. - -~~~bash -bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar> -~~~ - - -## Setup - -To get an overview of what options the Scala Shell provides, please use - -~~~bash -bin/start-scala-shell.sh --help -~~~ - -### Local - -To use the shell with an integrated Flink cluster just execute: - -~~~bash -bin/start-scala-shell.sh local -~~~ - - -### Remote - -To use it with a running cluster start the scala shell with the keyword `remote` -and supply the host and port of the JobManager with: - -~~~bash -bin/start-scala-shell.sh remote <hostname> <portnumber> -~~~ - -### Yarn Scala Shell cluster - -The shell can deploy a Flink cluster to YARN, which is used exclusively by the -shell. The number of YARN containers can be controlled by the parameter `-n <arg>`. -The shell deploys a new Flink cluster on YARN and connects the -cluster. You can also specify options for YARN cluster such as memory for -JobManager, name of YARN application, etc. - -For example, to start a Yarn cluster for the Scala Shell with two TaskManagers -use the following: - -~~~bash - bin/start-scala-shell.sh yarn -n 2 -~~~ - -For all other options, see the full reference at the bottom. - - -### Yarn Session - -If you have previously deployed a Flink cluster using the Flink Yarn Session, -the Scala shell can connect with it using the following command: - -~~~bash - bin/start-scala-shell.sh yarn -~~~ - - -## Full Reference - -~~~bash -Flink Scala Shell -Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>... - -Command: local [options] -Starts Flink scala shell with a local Flink cluster - -a <path/to/jar> | --addclasspath <path/to/jar> - Specifies additional jars to be used in Flink -Command: remote [options] <host> <port> -Starts Flink scala shell connecting to a remote cluster - <host> - Remote host name as string - <port> - Remote port as integer - - -a <path/to/jar> | --addclasspath <path/to/jar> - Specifies additional jars to be used in Flink -Command: yarn [options] -Starts Flink scala shell connecting to a yarn cluster - -n arg | --container arg - Number of YARN container to allocate (= Number of TaskManagers) - -jm arg | --jobManagerMemory arg - Memory for JobManager container [in MB] - -nm <value> | --name <value> - Set a custom name for the application on YARN - -qu <arg> | --queue <arg> - Specifies YARN queue - -s <arg> | --slots <arg> - Number of slots per TaskManager - -tm <arg> | --taskManagerMemory <arg> - Memory per TaskManager container [in MB] - -a <path/to/jar> | --addclasspath <path/to/jar> - Specifies additional jars to be used in Flink - --configDir <value> - The configuration directory. - -h | --help - Prints this usage text -~~~ http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/cassandra.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/cassandra.md b/docs/apis/streaming/connectors/cassandra.md deleted file mode 100644 index 28ad244..0000000 --- a/docs/apis/streaming/connectors/cassandra.md +++ /dev/null @@ -1,158 +0,0 @@ ---- -title: "Apache Cassandra Connector" - -# Sub-level navigation -sub-nav-group: streaming -sub-nav-parent: connectors -sub-nav-pos: 1 -sub-nav-title: Cassandra ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -This connector provides sinks that writes data into a [Cassandra](https://cassandra.apache.org/) database. - -To use this connector, add the following dependency to your project: - -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-cassandra{{ site.scala_version_suffix }}</artifactId> - <version>{{site.version }}</version> -</dependency> -{% endhighlight %} - -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). - -#### Installing Apache Cassandra -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted). - -#### Cassandra Sink - -Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method. -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink. - -The following configuration methods can be used: - -1. setQuery(String query) -2. setHost(String host[, int port]) -3. setClusterBuilder(ClusterBuilder builder) -4. enableWriteAheadLog([CheckpointCommitter committer]) -5. build() - -*setQuery()* sets the query that is executed for every value the sink receives. -*setHost()* sets the cassandra host/port to connect to. This method is intended for simple use-cases. -*setClusterBuilder()* sets the cluster builder that is used to configure the connection to cassandra. The *setHost()* functionality can be subsumed with this method. -*enableWriteAheadLog()* is an optional method, that allows exactly-once processing for non-deterministic algorithms. - -A checkpoint committer stores additional information about completed checkpoints -in some resource. This information is used to prevent a full replay of the last -completed checkpoint in case of a failure. -You can use a `CassandraCommitter` to store these in a separate table in cassandra. -Note that this table will NOT be cleaned up by Flink. - -*build()* finalizes the configuration and returns the CassandraSink. - -Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple -times without changing the result) and checkpointing is enabled. In case of a failure the failed -checkpoint will be replayed completely. - -Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program -the replayed checkpoint may be completely different than the previous attempt, which may leave the -database in an inconsitent state since part of the first attempt may already be written. -The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. -Note that that enabling this feature will have an adverse impact on latency. - -<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p> - - -#### Example - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { - @Override - public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); - } - }) - .build(); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { - @Override - public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); - } - }) - .build(); -{% endhighlight %} -</div> -</div> - -The Cassandra sinks support both tuples and POJO's that use DataStax annotations. -Flink automatically detects which type of input is used. - -Example for such a Pojo: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} - -@Table(keyspace= "test", name = "mappersink") -public class Pojo implements Serializable { - - private static final long serialVersionUID = 1038054554690916991L; - - @Column(name = "id") - private long id; - @Column(name = "value") - private String value; - - public Pojo(long id, String value){ - this.id = id; - this.value = value; - } - - public long getId() { - return id; - } - - public void setId(long id) { - this.id = id; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } -} -{% endhighlight %} -</div> -</div> http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/elasticsearch.md b/docs/apis/streaming/connectors/elasticsearch.md deleted file mode 100644 index 93b2bf6..0000000 --- a/docs/apis/streaming/connectors/elasticsearch.md +++ /dev/null @@ -1,183 +0,0 @@ ---- -title: "Elasticsearch Connector" - -# Sub-level navigation -sub-nav-group: streaming -sub-nav-parent: connectors -sub-nav-pos: 2 -sub-nav-title: Elasticsearch ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId> - <version>{{site.version }}</version> -</dependency> -{% endhighlight %} - -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) -for information about how to package the program with the libraries for -cluster execution. - -#### Installing Elasticsearch - -Instructions for setting up an Elasticsearch cluster can be found -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). -Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster - -#### Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient - -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. - -This code shows how to create a sink that uses an embedded Node for -communication: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<String> input = ...; - -Map<String, String> config = Maps.newHashMap(); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); - -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { - @Override - public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } -})); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream[String] = ... - -val config = new util.HashMap[String, String] -config.put("bulk.flush.max.actions", "1") -config.put("cluster.name", "my-cluster-name") - -text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] { - override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { - val json = new util.HashMap[String, AnyRef] - json.put("data", element) - println("SENDING: " + element) - Requests.indexRequest.index("my-index").`type`("my-type").source(json) - } -})) -{% endhighlight %} -</div> -</div> - -Note how a Map of Strings is used to configure the Sink. The configuration keys -are documented in the Elasticsearch documentation -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). -Especially important is the `cluster.name` parameter that must correspond to -the name of your cluster. - -Internally, the sink uses a `BulkProcessor` to send index requests to the cluster. -This will buffer elements before sending a request to the cluster. The behaviour of the -`BulkProcessor` can be configured using these config keys: - * **bulk.flush.max.actions**: Maximum amount of elements to buffer - * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer - * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two - settings in milliseconds - -This example code does the same, but with a `TransportClient`: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<String> input = ...; - -Map<String, String> config = Maps.newHashMap(); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); - -List<TransportAddress> transports = new ArrayList<String>(); -transports.add(new InetSocketTransportAddress("node-1", 9300)); -transports.add(new InetSocketTransportAddress("node-2", 9300)); - -input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() { - @Override - public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } -})); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream[String] = ... - -val config = new util.HashMap[String, String] -config.put("bulk.flush.max.actions", "1") -config.put("cluster.name", "my-cluster-name") - -val transports = new ArrayList[String] -transports.add(new InetSocketTransportAddress("node-1", 9300)) -transports.add(new InetSocketTransportAddress("node-2", 9300)) - -text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] { - override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { - val json = new util.HashMap[String, AnyRef] - json.put("data", element) - println("SENDING: " + element) - Requests.indexRequest.index("my-index").`type`("my-type").source(json) - } -})) -{% endhighlight %} -</div> -</div> - -The difference is that we now need to provide a list of Elasticsearch Nodes -to which the sink should connect using a `TransportClient`. - -More information about Elasticsearch can be found [here](https://elastic.co). http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/elasticsearch2.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/elasticsearch2.md b/docs/apis/streaming/connectors/elasticsearch2.md deleted file mode 100644 index 36d0920..0000000 --- a/docs/apis/streaming/connectors/elasticsearch2.md +++ /dev/null @@ -1,144 +0,0 @@ ---- -title: "Elasticsearch 2.x Connector" - -# Sub-level navigation -sub-nav-group: streaming -sub-nav-parent: connectors -sub-nav-pos: 2 -sub-nav-title: Elasticsearch 2.x ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -This connector provides a Sink that can write to an -[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</artifactId> - <version>{{site.version }}</version> -</dependency> -{% endhighlight %} - -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) -for information about how to package the program with the libraries for -cluster execution. - -#### Installing Elasticsearch 2.x - -Instructions for setting up an Elasticsearch cluster can be found -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). -Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster - -#### Elasticsearch 2.x Sink -The connector provides a Sink that can send data to an Elasticsearch 2.x Index. - -The sink communicates with Elasticsearch via Transport Client - -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html) -for information about the Transport Client. - -The code below shows how to create a sink that uses a `TransportClient` for communication: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -File dataDir = ....; - -DataStream<String> input = ...; - -Map<String, String> config = new HashMap<>(); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); - -List<InetSocketAddress> transports = new ArrayList<>(); -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>() { - public IndexRequest createIndexRequest(String element) { - Map<String, String> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } -})); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val dataDir = ....; - -val input: DataStream[String] = ... - -val config = new util.HashMap[String, String] -config.put("bulk.flush.max.actions", "1") -config.put("cluster.name", "my-cluster-name") - -val transports = new ArrayList[String] -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)) -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] { - def createIndexRequest(element: String): IndexRequest = { - val json = new util.HashMap[String, AnyRef] - json.put("data", element) - Requests.indexRequest.index("my-index").`type`("my-type").source(json) - } - - override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { - indexer.add(createIndexRequest(element)) - } -})) -{% endhighlight %} -</div> -</div> - -A Map of Strings is used to configure the Sink. The configuration keys -are documented in the Elasticsearch documentation -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). -Especially important is the `cluster.name`. parameter that must correspond to -the name of your cluster and with ElasticSearch 2x you also need to specify `path.home`. - -Internally, the sink uses a `BulkProcessor` to send Action requests to the cluster. -This will buffer elements and Action Requests before sending to the cluster. The behaviour of the -`BulkProcessor` can be configured using these config keys: - * **bulk.flush.max.actions**: Maximum amount of elements to buffer - * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer - * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two - settings in milliseconds - -This now provides a list of Elasticsearch Nodes -to which the sink should connect via a `TransportClient`. - -More information about Elasticsearch can be found [here](https://elastic.co). http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/filesystem_sink.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/filesystem_sink.md b/docs/apis/streaming/connectors/filesystem_sink.md deleted file mode 100644 index f2dc012..0000000 --- a/docs/apis/streaming/connectors/filesystem_sink.md +++ /dev/null @@ -1,133 +0,0 @@ ---- -title: "HDFS Connector" - -# Sub-level navigation -sub-nav-group: streaming -sub-nav-parent: connectors -sub-nav-pos: 3 -sub-nav-title: Filesystem Sink ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -This connector provides a Sink that writes rolling files to any filesystem supported by -Hadoop FileSystem. To use this connector, add the -following dependency to your project: - -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-filesystem{{ site.scala_version_suffix }}</artifactId> - <version>{{site.version}}</version> -</dependency> -{% endhighlight %} - -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) -for information about how to package the program with the libraries for -cluster execution. - -#### Rolling File Sink - -The rolling behaviour as well as the writing can be configured but we will get to that later. -This is how you can create a default rolling sink: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<String> input = ...; - -input.addSink(new RollingSink<String>("/base/path")); - -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream[String] = ... - -input.addSink(new RollingSink("/base/path")) - -{% endhighlight %} -</div> -</div> - -The only required parameter is the base path where the rolling files (buckets) will be -stored. The sink can be configured by specifying a custom bucketer, writer and batch size. - -By default the rolling sink will use the pattern `"yyyy-MM-dd--HH"` to name the rolling buckets. -This pattern is passed to `SimpleDateFormat` with the current system time to form a bucket path. A -new bucket will be created whenever the bucket path changes. For example, if you have a pattern -that contains minutes as the finest granularity you will get a new bucket every minute. -Each bucket is itself a directory that contains several part files: Each parallel instance -of the sink will create its own part file and when part files get too big the sink will also -create a new part file next to the others. To specify a custom bucketer use `setBucketer()` -on a `RollingSink`. - -The default writer is `StringWriter`. This will call `toString()` on the incoming elements -and write them to part files, separated by newline. To specify a custom writer use `setWriter()` -on a `RollingSink`. If you want to write Hadoop SequenceFiles you can use the provided -`SequenceFileWriter` which can also be configured to use compression. - -The last configuration option is the batch size. This specifies when a part file should be closed -and a new one started. (The default part file size is 384 MB). - -Example: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<Tuple2<IntWritable,Text>> input = ...; - -RollingSink sink = new RollingSink<String>("/base/path"); -sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); -sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); -sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, - -input.addSink(sink); - -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... - -val sink = new RollingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) -sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, - -input.addSink(sink) - -{% endhighlight %} -</div> -</div> - -This will create a sink that writes to bucket files that follow this schema: - -``` -/base/path/{date-time}/part-{parallel-task}-{count} -``` - -Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index -of the parallel sink instance and `count` is the running number of part files that where created -because of the batch size. - -For in-depth information, please refer to the JavaDoc for -[RollingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html). http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md deleted file mode 100644 index 83ca514..0000000 --- a/docs/apis/streaming/connectors/index.md +++ /dev/null @@ -1,47 +0,0 @@ ---- -title: "Streaming Connectors" - -# Sub-level navigation -sub-nav-group: streaming -sub-nav-id: connectors -sub-nav-pos: 6 -sub-nav-title: Connectors ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -Connectors provide code for interfacing with various third-party systems. - -Currently these systems are supported: - - * [Apache Kafka](https://kafka.apache.org/) (sink/source) - * [Elasticsearch](https://elastic.co/) (sink) - * [Elasticsearch 2x](https://elastic.com) (sink) - * [Hadoop FileSystem](http://hadoop.apache.org) (sink) - * [RabbitMQ](http://www.rabbitmq.com/) (sink/source) - * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source) - * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source) - * [Apache NiFi](https://nifi.apache.org) (sink/source) - * [Apache Cassandra](https://cassandra.apache.org/) (sink) - * [Redis](http://redis.io/) (sink) - -To run an application using one of these connectors, additional third party -components are usually required to be installed and launched, e.g. the servers -for the message queues. Further instructions for these can be found in the -corresponding subsections. http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md deleted file mode 100644 index e7cd05b..0000000 --- a/docs/apis/streaming/connectors/kafka.md +++ /dev/null @@ -1,293 +0,0 @@ ---- -title: "Apache Kafka Connector" - -# Sub-level navigation -sub-nav-group: streaming -sub-nav-parent: connectors -sub-nav-pos: 1 -sub-nav-title: Kafka ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/). - -Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. -The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide -exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group -offset tracking, but tracks and checkpoints these offsets internally as well. - -Please pick a package (maven artifact id) and class name for your use-case and environment. -For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is appropriate. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Maven Dependency</th> - <th class="text-left">Supported since</th> - <th class="text-left">Consumer and <br> - Producer Class name</th> - <th class="text-left">Kafka version</th> - <th class="text-left">Notes</th> - </tr> - </thead> - <tbody> - <tr> - <td>flink-connector-kafka</td> - <td>0.9.1, 0.10</td> - <td>FlinkKafkaConsumer082<br> - FlinkKafkaProducer</td> - <td>0.8.x</td> - <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> - </tr> - <tr> - <td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td> - <td>1.0.0</td> - <td>FlinkKafkaConsumer08<br> - FlinkKafkaProducer08</td> - <td>0.8.x</td> - <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> - </tr> - <tr> - <td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td> - <td>1.0.0</td> - <td>FlinkKafkaConsumer09<br> - FlinkKafkaProducer09</td> - <td>0.9.x</td> - <td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td> - </tr> - </tbody> -</table> - -Then, import the connector in your maven project: - -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</artifactId> - <version>{{site.version }}</version> -</dependency> -{% endhighlight %} - -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). - -### Installing Apache Kafka - -* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application). -* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur. -* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address. - -### Kafka Consumer - -Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 0.9.0.x versions). It provides access to one or more Kafka topics. - -The constructor accepts the following arguments: - -1. The topic name / list of topic names -2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka -3. Properties for the Kafka consumer. - The following properties are required: - - "bootstrap.servers" (comma separated list of Kafka brokers) - - "zookeeper.connect" (comma separated list of Zookeeper servers) (**only required for Kafka 0.8**) - - "group.id" the id of the consumer group - -Example: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -Properties properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); -// only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); -DataStream<String> stream = env - .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties)) - .print(); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); -// only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); -stream = env - .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)) - .print -{% endhighlight %} -</div> -</div> - -The current FlinkKafkaConsumer implementation will establish a connection from the client (when calling the constructor) -for querying the list of topics and partitions. - -For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster. -If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc. - -#### The `DeserializationSchema` - -The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The -`DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)` -method gets called for each Kafka message, passing the value from Kafka. - -It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the -produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need -to implement the `getProducedType(...)` method themselves. - -For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has -the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`. - -For convenience, Flink provides the following schemas: - -1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) which creates - a schema based on a Flink's `TypeInformation`. This is useful if the data is both written and read by Flink. - This schema is a performant Flink-specific alternative to other generic serialization approaches. - -2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which turns the serialized JSON - into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/...)(). - The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as - an optional "metadata" field that exposes the offset/partition/topic for this message. - -#### Kafka Consumers and Fault Tolerance - -With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all -its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore -the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where -stored in the checkpoint. - -The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. - -To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); // checkpoint every 5000 msecs -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.enableCheckpointing(5000) // checkpoint every 5000 msecs -{% endhighlight %} -</div> -</div> - -Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. -So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. -Flink on YARN supports automatic restart of lost YARN containers. - -If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper. - -#### Kafka Consumers and Timestamp Extraction/Watermark Emission - -In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. -In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on -special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka -Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`. - -You can specify your custom timestamp extractor/watermark emitter as described -[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or use one from the -[predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, you -can pass it to your consumer in the following way: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -Properties properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); -// only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); - -FlinkKafkaConsumer08<String> myConsumer = - new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); -myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); - -DataStream<String> stream = env - .addSource(myConsumer) - .print(); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val properties = new Properties(); -properties.setProperty("bootstrap.servers", "localhost:9092"); -// only required for Kafka 0.8 -properties.setProperty("zookeeper.connect", "localhost:2181"); -properties.setProperty("group.id", "test"); - -val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties); -myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); -stream = env - .addSource(myConsumer) - .print -{% endhighlight %} -</div> -</div> - -Internally, an instance of the assigner is executed per Kafka partition. -When such an assigner is specified, for each record read from Kafka, the -`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and -the `Watermark getCurrentWatermark()` (for periodic) or the -`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine -if a new watermark should be emitted and with which timestamp. - -### Kafka Producer - -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. - -Example: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema())) -{% endhighlight %} -</div> -</div> - -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - - -**Note**: By default, the number of retries is set to "0". This means that the producer fails immediately on errors, -including leader changes. The value is set to "0" by default to avoid duplicate messages in the target topic. -For most production environments with frequent broker changes, we recommend setting the number of retries to a -higher value. - -There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery -into a Kafka topic. -
