http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kinesis.md 
b/docs/apis/streaming/connectors/kinesis.md
deleted file mode 100644
index 54a75db..0000000
--- a/docs/apis/streaming/connectors/kinesis.md
+++ /dev/null
@@ -1,322 +0,0 @@
----
-title: "Amazon AWS Kinesis Streams Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 5
-sub-nav-title: Amazon Kinesis Streams
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-The Kinesis connector provides access to [Amazon AWS Kinesis 
Streams](http://aws.amazon.com/kinesis/streams/). 
-
-To use the connector, add the following Maven dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix 
}}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-**The `flink-connector-kinesis{{ site.scala_version_suffix }}` has a 
dependency on code licensed under the [Amazon Software 
License](https://aws.amazon.com/asl/) (ASL).
-Linking to the flink-connector-kinesis will include ASL licensed code into 
your application.**
-
-The `flink-connector-kinesis{{ site.scala_version_suffix }}` artifact is not 
deployed to Maven central as part of
-Flink releases because of the licensing issue. Therefore, you need to build 
the connector yourself from the source.
-
-Download the Flink source or check it out from the git repository. Then, use 
the following Maven command to build the module:
-{% highlight bash %}
-mvn clean install -Pinclude-kinesis -DskipTests
-# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so 
we need to run mvn for flink-dist again. 
-cd flink-dist
-mvn clean install -Pinclude-kinesis -DskipTests
-{% endhighlight %}
-
-
-The streaming connectors are not part of the binary distribution. See how to 
link with them for cluster 
-execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-### Using the Amazon Kinesis Streams Service
-Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams. Make sure to create the appropriate IAM policy and 
user to read / write to the Kinesis streams.
-
-### Kinesis Consumer
-
-The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source 
that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can handle resharding of 
streams. Each subtask of the consumer is
-responsible for fetching data records from multiple Kinesis shards. The number 
of shards fetched by each subtask will
-change as shards are closed and created by Kinesis.
-
-Before consuming data from Kinesis streams, make sure that all streams are 
created with the status "ACTIVE" in the AWS dashboard.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
-consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
-
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
-
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
-consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
-
-val env = StreamExecutionEnvironment.getEnvironment
-
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-{% endhighlight %}
-</div>
-</div>
-
-The above is a simple example of using the consumer. Configuration for the 
consumer is supplied with a `java.util.Properties`
-instance, the configuration keys for which can be found in 
`ConsumerConfigConstants`. The example
-demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". 
The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the 
configuration (other options are setting
-`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, 
`PROFILE`, and `AUTO`). Also, data is being consumed
-from the newest position in the Kinesis stream (the other option will be 
setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
-to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream 
from the earliest record possible).
-
-Other optional configuration keys for the consumer can be found in 
`ConsumerConfigConstants`.
-
-#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
-
-With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume 
records from shards in Kinesis streams and
-periodically checkpoint each shard's progress. In case of a job failure, Flink 
will restore the streaming program to the
-state of the latest complete checkpoint and re-consume the records from 
Kinesis shards, starting from the progress that
-was stored in the checkpoint.
-
-The interval of drawing checkpoints therefore defines how much the program may 
have to go back at most, in case of a failure.
-
-To use fault tolerant Kinesis Consumers, checkpointing of the topology needs 
to be enabled at the execution environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-</div>
-
-Also note that Flink can only restart the topology if enough processing slots 
are available to restart the topology.
-Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
-
-#### Event Time for Consumed Records
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
-timestamps, an *approximate arrival timestamp* will be used by default. This 
timestamp is attached to records by Kinesis once they
-were successfully received and stored by streams. Note that this timestamp is 
typically referred to as a Kinesis server-side
-timestamp, and there are no guarantees about the accuracy or order correctness 
(i.e., the timestamps may not always be
-ascending).
-
-Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
-or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
-it can be passed to the consumer in the following way:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
-kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
-kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
-{% endhighlight %}
-</div>
-</div>
-
-#### Threading Model
-
-The Flink Kinesis Consumer uses multiple threads for shard discovery and data 
consumption.
-
-For shard discovery, each parallel consumer subtask will have a single thread 
that constantly queries Kinesis for shard
-information even if the subtask initially did not have shards to read from 
when the consumer was started. In other words, if
-the consumer is run with a parallelism of 10, there will be a total of 10 
threads constantly querying Kinesis regardless
-of the total amount of shards in the subscribed streams.
-
-For data consumption, a single thread will be created to consume each 
discovered shard. Threads will terminate when the
-shard it is responsible of consuming is closed as a result of stream 
resharding. In other words, there will always be
-one thread per open shard.
-
-#### Internally Used Kinesis APIs
-
-The Flink Kinesis Consumer uses the [AWS Java 
SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
-for shard discovery and data consumption. Due to Amazon's [service limits for 
Kinesis 
Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
-on the APIs, the consumer will be competing with other non-Flink consuming 
applications that the user may be running.
-Below is a list of APIs called by the consumer with description of how the 
consumer uses the API, as well as information
-on how to deal with any errors or warnings that the Flink Kinesis Consumer may 
have due to these service limits.
-
-- 
*[DescribeStream](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)*:
 this is constantly called
-by a single thread in each parallel consumer subtask to discover any new 
shards as a result of stream resharding. By default,
-the consumer performs the shard discovery at an interval of 10 seconds, and 
will retry indefinitely until it gets a result
-from Kinesis. If this interferes with other non-Flink consuming applications, 
users can slow down the consumer of
-calling this API by setting a value for 
`ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied
-configuration properties. This sets the discovery interval to a different 
value. Note that this setting directly impacts
-the maximum delay of discovering a new shard and starting to consume it, as 
shards will not be discovered during the interval.
-
-- 
*[GetShardIterator](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*:
 this is called
-only once when per shard consuming threads are started, and will retry if 
Kinesis complains that the transaction limit for the
-API has exceeded, up to a default of 3 attempts. Note that since the rate 
limit for this API is per shard (not per stream),
-the consumer itself should not exceed the limit. Usually, if this happens, 
users can either try to slow down any other
-non-Flink consuming applications of calling this API, or modify the retry 
behaviour of this API call in the consumer by
-setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the 
supplied configuration properties.
-
-- 
*[GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*:
 this is constantly called
-by per shard consuming threads to fetch records from Kinesis. When a shard has 
multiple concurrent consumers (when there
-are any other non-Flink consuming applications running), the per shard rate 
limit may be exceeded. By default, on each call
-of this API, the consumer will retry if Kinesis complains that the data size / 
transaction limit for the API has exceeded,
-up to a default of 3 attempts. Users can either try to slow down other 
non-Flink consuming applications, or adjust the throughput
-of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` 
and
-`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the 
supplied configuration properties. Setting the former
-adjusts the maximum number of records each consuming thread tries to fetch 
from shards on each call (default is 100), while
-the latter modifies the sleep interval between each fetch (there will be no 
sleep by default). The retry behaviour of the
-consumer when calling this API can also be modified by using the other keys 
prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
-
-### Kinesis Producer
-
-The `FlinkKinesisProducer` is used for putting data from a Flink stream into a 
Kinesis stream. Note that the producer is not participating in
-Flink's checkpointing and doesn't provide exactly-once processing guarantees. 
-Also, the Kinesis producer does not guarantee that records are written in 
order to the shards (See 
[here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and 
[here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax)
 for more details). 
-
-In case of a failure or a resharding, data will be written again to Kinesis, 
leading to duplicates. This behavior is usually called "at-least-once" 
semantics.
-
-To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" 
in the AWS dashboard.
-
-For the monitoring to work, the user accessing the stream needs access to the 
Cloud watch service.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
-
-DataStream<String> simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-
-val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, 
producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
-
-val simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
-{% endhighlight %}
-</div>
-</div>
-
-The above is a simple example of using the producer. Configuration for the 
producer with the mandatory configuration values is supplied with a 
`java.util.Properties`
-instance as described above for the consumer. The example demonstrates 
producing a single Kinesis stream in the AWS region "us-east-1".
-
-Instead of a `SerializationSchema`, it also supports a 
`KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send 
the data to multiple streams. This is 
-done using the `KinesisSerializationSchema.getTargetStream(T element)` method. 
Returning `null` there will instruct the producer to write the element to the 
default stream.
-Otherwise, the returned stream name is used.
-
-Other optional configuration keys for the producer can be found in 
`ProducerConfigConstants`.
-               
-               
-### Using Non-AWS Kinesis Endpoints for Testing
-
-It is sometimes desirable to have Flink operate as a consumer or producer 
against a non-AWS Kinesis endpoint such as
-[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful 
when performing functional testing of a Flink
-application. The AWS endpoint that would normally be inferred by the AWS 
region set in the Flink configuration must be overridden via a configuration 
property.
-
-To override the AWS endpoint, taking the producer for example, set the 
`ProducerConfigConstants.AWS_ENDPOINT` property in the
-Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` 
required by Flink. Although the region is
-required, it will not be used to determine the AWS endpoint URL.
-
-The following example shows how one might supply the 
`ProducerConfigConstants.AWS_ENDPOINT` configuration property:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, 
"http://localhost:4567";);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, 
"http://localhost:4567";);
-{% endhighlight %}
-</div>
-</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/nifi.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/nifi.md 
b/docs/apis/streaming/connectors/nifi.md
deleted file mode 100644
index a47b8f0..0000000
--- a/docs/apis/streaming/connectors/nifi.md
+++ /dev/null
@@ -1,141 +0,0 @@
----
-title: "Apache NiFi Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 7
-sub-nav-title: Apache NiFi
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Source and Sink that can read from and write to 
-[Apache NiFi](https://nifi.apache.org/). To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-nifi{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Apache NiFi
-
-Instructions for setting up a Apache NiFi cluster can be found
-[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi).
-
-#### Apache NiFi Source
-
-The connector provides a Source for reading data from Apache NiFi to Apache 
Flink.
-
-The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi.
-
-- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given 
the client's SiteToSiteConfig and a
-     default wait time of 1000 ms.
-      
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a 
`NiFiSource(…)` given the client's
-     SiteToSiteConfig and the specified wait time (in milliseconds).
-     
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi";)
-        .portName("Data for Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi";)
-       .portName("Data for Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-       
-val nifiSource = new NiFiSource(clientConfig)       
-{% endhighlight %}       
-</div>
-</div>
-
-Here data is read from the Apache NiFi Output Port called "Data for Flink" 
which is part of Apache NiFi 
-Site-to-site protocol configuration.
- 
-#### Apache NiFi Sink
-
-The connector provides a Sink for writing data from Apache Flink to Apache 
NiFi.
-
-The class `NiFiSink(…)` provides a constructor for instantiating a 
`NiFiSink`.
-
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a 
`NiFiSink(…)` given the client's `SiteToSiteConfig` and a 
`NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be 
ingested by NiFi.
-      
-Example:
-      
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi";)
-        .portName("Data from Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new 
NiFiDataPacketBuilder<T>() {...});
-
-streamExecEnv.addSink(nifiSink);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi";)
-       .portName("Data from Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-       
-val nifiSink: NiFiSink[NiFiDataPacket] = new 
NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
-
-streamExecEnv.addSink(nifiSink)
-{% endhighlight %}       
-</div>
-</div>      
-
-More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site 
Protocol can be found 
[here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/rabbitmq.md 
b/docs/apis/streaming/connectors/rabbitmq.md
deleted file mode 100644
index 0e186e0..0000000
--- a/docs/apis/streaming/connectors/rabbitmq.md
+++ /dev/null
@@ -1,132 +0,0 @@
----
-title: "RabbitMQ Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 4
-sub-nav-title: RabbitMQ
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides access to data streams from 
[RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following 
dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-rabbitmq{{ site.scala_version_suffix 
}}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Installing RabbitMQ
-Follow the instructions from the [RabbitMQ download 
page](http://www.rabbitmq.com/download.html). After the installation the server 
automatically starts, and the application connecting to RabbitMQ can be 
launched.
-
-#### RabbitMQ Source
-
-A class which provides an interface for receiving data from RabbitMQ.
-
-The followings have to be provided for the `RMQSource(…)` constructor in 
order:
-
-- RMQConnectionConfig.
-- queueName: The RabbitMQ queue name.
-- usesCorrelationId: `true` when correlation ids should be used, `false` 
otherwise (default is `false`).
-- deserializationSchema: Deserialization schema to turn messages into Java 
objects.
-
-This source can be operated in three different modes:
-
-1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages 
with
-    unique correlation IDs.
-2. At-least-once (when checkpointed) with RabbitMQ transactions but no 
deduplication mechanism
-    (correlation id is not set).
-3. No strong delivery guarantees (without checkpointing) with RabbitMQ 
auto-commit mode.
-
-Correlation ids are a RabbitMQ application feature. You have to set it in the 
message properties
-when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true 
and do not supply
-unique correlation ids, the source will throw an exception (if the correlation 
id is null) or ignore
-messages with non-unique correlation ids. If you set `usesCorrelationId` to 
false, then you don't
-have to supply correlation ids.
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-DataStream<String> streamWithoutCorrelationIds = env
-       .addSource(new RMQSource<String>(connectionConfig, "hello", new 
SimpleStringSchema()))
-       .print
-
-DataStream<String> streamWithCorrelationIds = env
-       .addSource(new RMQSource<String>(connectionConfig, "hello", true, new 
SimpleStringSchema()))
-       .print
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-streamWithoutCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", new 
SimpleStringSchema))
-    .print
-
-streamWithCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", true, new 
SimpleStringSchema))
-    .print
-{% endhighlight %}
-</div>
-</div>
-
-#### RabbitMQ Sink
-A class providing an interface for sending data to RabbitMQ.
-
-The followings have to be provided for the `RMQSink(…)` constructor in order:
-
-1. RMQConnectionConfig
-2. The queue name
-3. Serialization schema
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-stream.addSink(new RMQSink<String>(connectionConfig, "hello", new 
SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-stream.addSink(new RMQSink[String](connectionConfig, "hello", new 
SimpleStringSchema))
-{% endhighlight %}
-</div>
-</div>
-
-More about RabbitMQ can be found [here](http://www.rabbitmq.com/).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/redis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/redis.md 
b/docs/apis/streaming/connectors/redis.md
deleted file mode 100644
index dfa5296..0000000
--- a/docs/apis/streaming/connectors/redis.md
+++ /dev/null
@@ -1,177 +0,0 @@
----
-title: "Redis Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 6
-sub-nav-title: Redis
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to
-[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
-following dependency to your project:
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-Version Compatibility: This module is compatible with Redis 2.8.5.
-
-Note that the streaming connectors are currently not part of the binary 
distribution. You need to link them for cluster execution 
[explicitly]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Installing Redis
-Follow the instructions from the [Redis download 
page](http://redis.io/download).
-
-#### Redis Sink
-A class providing an interface for sending data to Redis. 
-The sink can use three different methods for communicating with different type 
of Redis environments:
-1. Single Redis Server
-2. Redis Cluster
-3. Redis Sentinel
-
-This code shows how to create a sink that communicate to a single redis server:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class RedisExampleMapper implements RedisMapper<Tuple2<String, 
String>>{
-
-    @Override
-    public RedisCommandDescription getCommandDescription() {
-        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
-    }
-
-    @Override
-    public String getKeyFromData(Tuple2<String, String> data) {
-        return data.f0;
-    }
-
-    @Override
-    public String getValueFromData(Tuple2<String, String> data) {
-        return data.f1;
-    }
-}
-FlinkJedisPoolConfig conf = new 
FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class RedisExampleMapper extends RedisMapper[(String, String)]{
-  override def getCommandDescription: RedisCommandDescription = {
-    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
-  }
-
-  override def getKeyFromData(data: (String, String)): String = data._1
-
-  override def getValueFromData(data: (String, String)): String = data._2
-}
-val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example code does the same, but for Redis Cluster:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
-    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new 
InetSocketAddress(5601)))).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example shows when the Redis environment is with Sentinels:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
-    .setMasterName("master").setSentinels(...).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new 
FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This section gives a description of all the available data types and what 
Redis command used for that.
-
-<table class="table table-bordered" style="width: 75%">
-    <thead>
-        <tr>
-          <th class="text-center" style="width: 20%">Data Type</th>
-          <th class="text-center" style="width: 25%">Redis Command [Sink]</th>
-          <th class="text-center" style="width: 25%">Redis Command 
[Source]</th>
-        </tr>
-      </thead>
-      <tbody>
-        <tr>
-            <td>HASH</td><td><a 
href="http://redis.io/commands/hset";>HSET</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>LIST</td><td>
-                <a href="http://redis.io/commands/rpush";>RPUSH</a>, 
-                <a href="http://redis.io/commands/lpush";>LPUSH</a>
-            </td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>SET</td><td><a 
href="http://redis.io/commands/rpush";>SADD</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>PUBSUB</td><td><a 
href="http://redis.io/commands/publish";>PUBLISH</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>STRING</td><td><a 
href="http://redis.io/commands/set";>SET</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>HYPER_LOG_LOG</td><td><a 
href="http://redis.io/commands/pfadd";>PFADD</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>SORTED_SET</td><td><a 
href="http://redis.io/commands/zadd";>ZADD</a></td><td>--NA--</td>
-        </tr>                
-      </tbody>
-</table>
-More about Redis can be found [here](http://redis.io/).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/twitter.md 
b/docs/apis/streaming/connectors/twitter.md
deleted file mode 100644
index 9e84481..0000000
--- a/docs/apis/streaming/connectors/twitter.md
+++ /dev/null
@@ -1,89 +0,0 @@
----
-title: "Twitter Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 5
-sub-nav-title: Twitter
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-The Twitter Streaming API provides access to the stream of tweets made 
available by Twitter. 
-Flink Streaming comes with a built-in `TwitterSource` class for establishing a 
connection to this stream. 
-To use this connector, add the following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-twitter{{ site.scala_version_suffix 
}}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary 
distribution. 
-See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Authentication
-In order to connect to the Twitter stream the user has to register their 
program and acquire the necessary information for the authentication. The 
process is described below.
-
-#### Acquiring the authentication information
-First of all, a Twitter account is needed. Sign up for free at 
[twitter.com/signup](https://twitter.com/signup) 
-or sign in at Twitter's [Application Management](https://apps.twitter.com/) 
and register the application by 
-clicking on the "Create New App" button. Fill out a form about your program 
and accept the Terms and Conditions.
-After selecting the application, the API key and API secret (called 
`twitter-source.consumerKey` and `twitter-source.consumerSecret` in 
`TwitterSource` respectively) are located on the "API Keys" tab. 
-The necessary OAuth Access Token data (`twitter-source.token` and 
`twitter-source.tokenSecret` in `TwitterSource`) can be generated and acquired 
on the "Keys and Access Tokens" tab.
-Remember to keep these pieces of information secret and do not push them to 
public repositories.
- 
- 
-
-#### Usage
-In contrast to other connectors, the `TwitterSource` depends on no additional 
services. For example the following code should run gracefully:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
-DataStream<String> streamSource = env.addSource(new TwitterSource(props));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
-DataStream<String> streamSource = env.addSource(new TwitterSource(props));
-{% endhighlight %}
-</div>
-</div>
-
-The `TwitterSource` emits strings containing a JSON object, representing a 
Tweet.
-
-The `TwitterExample` class in the `flink-examples-streaming` package shows a 
full example how to use the `TwitterSource`.
-
-By default, the `TwitterSource` uses the `StatusesSampleEndpoint`. This 
endpoint returns a random sample of Tweets.
-There is a `TwitterSource.EndpointInitializer` interface allowing users to 
provide a custom endpoint.
-

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/event_time.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_time.md 
b/docs/apis/streaming/event_time.md
deleted file mode 100644
index 7f94d68..0000000
--- a/docs/apis/streaming/event_time.md
+++ /dev/null
@@ -1,208 +0,0 @@
----
-title: "Event Time"
-
-sub-nav-id: eventtime
-sub-nav-group: streaming
-sub-nav-pos: 3
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* toc
-{:toc}
-
-# Event Time / Processing Time / Ingestion Time
-
-Flink supports different notions of *time* in streaming programs.
-
-- **Processing time:** Processing time refers to the system time of the 
machine that is executing the
-    respective operation.
-
-    When a streaming program runs on processing time, all time-based 
operations (like time windows) will
-    use the system clock of the machines that run the respective operator. For 
example, an hourly
-    processing time window will include all records that arrived at a specific 
operator between the
-    times when the system clock indicated the full hour.
-
-    Processing time is the simplest notion of time and requires no 
coordination between streams and machines.
-    It provides the best performance and the lowest latency. However, in 
distributed and asynchronous
-    environments processing time does not provide determinism, because it is 
susceptible to the speed at which
-    records arrive in the system (for example from the message queue), and to 
the speed at which the
-    records flow between operators inside the system.
-
-- **Event time:** Event time is the time that each individual event occurred 
on its producing device.
-    This time is typically embedded within the records before they enter Flink 
and that *event timestamp*
-    can be extracted from the record. An hourly event time window will contain 
all records that carry an
-    event timestamp that falls into that hour, regardless of when the records 
arrive, and in what order
-    they arrive.
-
-    Event time gives correct results even on out-of-order events, late events, 
or on replays
-    of data from backups or persistent logs. In event time, the progress of 
time depends on the data,
-    not on any wall clocks. Event time programs must specify how to generate 
*Event Time Watermarks*,
-    which is the mechanism that signals time progress in event time. The 
mechanism is
-    described below.
-
-    Event time processing often incurs a certain latency, due to it nature of 
waiting a certain time for
-    late events and out-of-order events. Because of that, event time programs 
are often combined with
-    *processing time* operations.
-
-- **Ingestion time:** Ingestion time is the time that events enter Flink. At 
the source operator, each
-    record gets the source's current time as a timestamp, and time-based 
operations (like time windows)
-    refer to that timestamp.
-
-    *Ingestion Time* sits conceptually in between *Event Time* and *Processing 
Time*. Compared to
-    *Processing Time*, it is slightly more expensive, but gives more 
predictable results: Because
-    *Ingestion Time* uses stable timestamps (assigned once at the source), 
different window operations
-    over the records will refer to the same timestamp, whereas in *Processing 
Time* each window operator
-    may assign the record to a different window (based on the local system 
clock and any transport delay).
-
-    Compered to *Event Time*, *Ingestion Time* programs cannot handle any 
out-of-order events or late data,
-    but the programs don't have to specify how to generate *Watermarks*.
-
-    Internally, *Ingestion Time* is treated much like event time, with 
automatic timestamp assignment and
-    automatic Watermark generation.
-
-<img src="fig/times_clocks.svg" class="center" width="80%" />
-
-
-### Setting a Time Characteristic
-
-The first part of a Flink DataStream program is usually to set the base *time 
characteristic*. That setting
-defines how data stream sources behave (for example whether to assign 
timestamps), and what notion of
-time the window operations like `KeyedStream.timeWindow(Time.seconds(30))` 
refer to.
-
-The following example shows a Flink program that aggregates events in hourly 
time windows. The behavior of the
-windows adapts with the time characteristic.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-DataStream<MyEvent> stream = env.addSource(new 
FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
-
-stream
-    .keyBy( (event) -> event.getUser() )
-    .timeWindow(Time.hours(1))
-    .reduce( (a, b) -> a.add(b) )
-    .addSink(...);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val stream: DataStream[MyEvent] = env.addSource(new 
FlinkKafkaConsumer09[MyEvent](topic, schema, props))
-
-stream
-    .keyBy( _.getUser )
-    .timeWindow(Time.hours(1))
-    .reduce( (a, b) => a.add(b) )
-    .addSink(...)
-{% endhighlight %}
-</div>
-</div>
-
-
-Note that in order to run this example in *Event Time*, the program needs to 
use either an event time
-source, or inject a *Timestamp Assigner & Watermark Generator*. Those 
functions describe how to access
-the event timestamps, and what timely out-of-orderness the event stream 
exhibits.
-
-The section below describes the general mechanism behind *Timestamps* and 
*Watermarks*. For a guide on how
-to use timestamp assignment and watermark generation in the Flink DataStream 
API, please refer to
-[Generating Timestamps / Watermarks]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html)
-
-
-# Event Time and Watermarks
-
-*Note: Flink implements many techniques from the Dataflow Model. For a good 
introduction to Event Time and, have also a look at these articles*
-
-  - [Streaming 
101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by 
Tyler Akidau
-  - The [Dataflow Model 
paper](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf)
-
-
-A stream processor that supports *event time* needs a way to measure the 
progress of event time. 
-For example, a window operator that builds hourly windows needs to be notified 
when event time has reached the
-next full hour, such that the operator can close the next window.
-
-*Event Time* can progress independently of *Processing Time* (measures by wall 
clocks).
-For example, in one program, the current *event time* of an operator can trail 
slightly behind the processing time
-(accounting for a delay in receiving the latest elements) and both proceed at 
the same speed. In another streaming
-program, which reads fast-forward through some data already buffered in a 
Kafka topic (or another message queue), event time
-can progress by weeks in seconds.
-
-------
-
-The mechanism in Flink to measure progress in event time is **Watermarks**.
-Watermarks flow as part of the data stream and carry a timestamp *t*. A 
*Watermark(t)* declares that event time has reached time
-*t* in that stream, meaning that all events with a timestamps *t' < t* have 
occurred.
-
-The figure below shows a stream of events with (logical) timestamps, and 
watermarks flowing inline. The events are in order
-(with respect to their timestamp), meaning that watermarks are simply periodic 
markers in the stream with an in-order timestamp.
-
-<img src="fig/stream_watermark_in_order.svg" alt="A data stream with events 
(in order) and watermarks" class="center" width="65%" />
-
-Watermarks are crucial for *out-of-order* streams, as shown in the figure 
below, where, events do not occur ordered by their timestamp.
-Watermarks establish points in the stream where all events up to a certain 
timestamp have occurred. Once these watermarks reach an
-operator, the operator can advance its internal *event time clock* to the 
value of the watermark.
-
-<img src="fig/stream_watermark_out_of_order.svg" alt="A data stream with 
events (out of order) and watermarks" class="center" width="65%" />
-
-
-## Watermarks in Parallel Streams
-
-Watermarks are generated at source functions, or directly after source 
functions. Each parallel subtask of a source function usually
-generates its watermarks independently. These watermarks define the event time 
at that particular parallel source.
-
-As the watermarks flow through the streaming program, they advance the event 
time at the operators where they arrive. Whenever an
-operator advances its event time, it generates a new watermark downstream for 
its successor operators.
-
-Operators that consume multiple input streams (e.g., after a *keyBy(...)* or 
*partition(...)* function, or a union) track the event time
-on each of their input streams. The operator's current event time is the 
minimum of the input streams' event time. As the input streams
-update their event time, so does the operator.
-
-The figure below shows an example of events and watermarks flowing through 
parallel streams, and operators tracking event time.
-
-<img src="fig/parallel_streams_watermarks.svg" alt="Parallel data streams and 
operators with events and watermarks" class="center" width="80%" />
-
-
-## Late Elements
-
-It is possible that certain elements violate the watermark condition, meaning 
that even after the *Watermark(t)* has occurred,
-more elements with timestamp *t' < t* will occur. In fact, in many real world 
setups, certain elements can be arbitrarily
-delayed, making it impossible to define a time when all elements of a certain 
event timestamp have occurred.
-Further more, even if the lateness can be bounded, delaying the watermarks by 
too much is often not desirable, because it delays
-the evaluation of the event time windows by too much.
-
-Due to that, some streaming programs will explicitly expect a number of *late* 
elements. Late elements are elements that
-arrive after the system's event time clock (as signaled by the watermarks) has 
already passed the time of the late element's
-timestamp.
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/event_timestamp_extractors.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamp_extractors.md 
b/docs/apis/streaming/event_timestamp_extractors.md
deleted file mode 100644
index 83a90d2..0000000
--- a/docs/apis/streaming/event_timestamp_extractors.md
+++ /dev/null
@@ -1,108 +0,0 @@
----
-title: "Pre-defined Timestamp Extractors / Watermark Emitters"
-
-sub-nav-group: streaming
-sub-nav-pos: 2
-sub-nav-parent: eventtime
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* toc
-{:toc}
-
-As described in [timestamps and watermark handling]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own 
timestamps and emit their own watermarks. More specifically, 
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and 
`AssignerWithPunctuatedWatermarks` interfaces, depending 
-on their use-case. In a nutshell, the first will emit watermarks periodically, 
while the second does so based on some property of 
-the incoming records, e.g. whenever a special element is encountered in the 
stream.
-
-In order to further ease the programming effort for such tasks, Flink comes 
with some pre-implemented timestamp assigners. 
-This section provides a list of them. Apart from their out-of-the-box 
functionality, their implementation can serve as an example 
-for custom assigner implementations.
-
-#### **Assigner with Ascending Timestamps**
-
-The simplest special case for *periodic* watermark generation is the case 
where timestamps seen by a given source task 
-occur in ascending order. In that case, the current timestamp can always act 
as a watermark, because no earlier timestamps will 
-arrive.
-
-Note that it is only necessary that timestamps are ascending *per parallel 
data source task*. For example, if
-in a specific setup one Kafka partition is read by one parallel data source 
instance, then it is only necessary that
-timestamps are ascending within each Kafka partition. Flink's Watermark 
merging mechanism will generate correct
-watermarks whenever parallel streams are shuffled, unioned, connected, or 
merged.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks = 
-    stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<MyEvent>() {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( 
_.getCreationTime )
-{% endhighlight %}
-</div>
-</div>
-
-#### **Assigner which allows a fixed amount of record lateness**
-
-Another example of periodic watermark generation is when the watermark lags 
behind the maximum (event-time) timestamp
-seen in the stream by a fixed amount of time. This case covers scenarios where 
the maximum lateness that can be encountered in a 
-stream is known in advance, e.g. when creating a custom source containing 
elements with timestamps spread within a fixed period of 
-time for testing. For these cases, Flink provides the 
`BoundedOutOfOrdernessTimestampExtractor` which takes as an argument 
-the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed 
to be late before being ignored when computing the 
-final result for the given window. Lateness corresponds to the result of `t - 
t_w`, where `t` is the (event-time) timestamp of an 
-element, and `t_w` that of the previous watermark. If `lateness > 0` then the 
element is considered late and is ignored when computing 
-the result of the job for its corresponding window.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks = 
-    stream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( 
_.getCreationTime ))
-{% endhighlight %}
-</div>
-</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamps_watermarks.md 
b/docs/apis/streaming/event_timestamps_watermarks.md
deleted file mode 100644
index 05c9f51..0000000
--- a/docs/apis/streaming/event_timestamps_watermarks.md
+++ /dev/null
@@ -1,332 +0,0 @@
----
-title: "Generating Timestamps / Watermarks"
-
-sub-nav-group: streaming
-sub-nav-pos: 1
-sub-nav-parent: eventtime
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* toc
-{:toc}
-
-
-This section is relevant for program running on **Event Time**. For an 
introduction to *Event Time*,
-*Processing Time*, and *Ingestion Time*, please refer to the [event time 
introduction]({{ site.baseurl }}/apis/streaming/event_time.html)
-
-To work with *Event Time*, streaming programs need to set the *time 
characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *Event Time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
-stream needs to get its event timestamp *assigned*. That happens usually by 
accessing/extracting the
-timestamp from some field in the element.
-
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about
-the progress in event time.
-
-There are two ways to assign timestamps and generate Watermarks:
-
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink timestamp 
assigners also define the watermarks to be emitted
-
-<span class="label label-danger">Attention</span> Both timestamps and 
watermarks are specified as
-millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
-
-### Source Functions with Timestamps and Watermarks
-
-Stream sources can also directly assign timestamps to the elements they 
produce and emit Watermarks. In that case,
-no Timestamp Assigner is needed.
-
-To assign a timestamp to an element in the source directly, the source must 
use the `collectWithTimestamp(...)`
-method on the `SourceContext`. To generate Watermarks, the source must call 
the `emitWatermark(Watermark)` function.
-
-Below is a simple example of a source *(non-checkpointed)* that assigns 
timestamps and generates Watermarks
-depending on special events:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-@Override
-public void run(SourceContext<MyType> ctx) throws Exception {
-       while (/* condition */) {
-               MyType next = getNext();
-               ctx.collectWithTimestamp(next, next.getEventTimestamp());
-
-               if (next.hasWatermarkTime()) {
-                       ctx.emitWatermark(new 
Watermark(next.getWatermarkTime()));
-               }
-       }
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-override def run(ctx: SourceContext[MyType]): Unit = {
-       while (/* condition */) {
-               val next: MyType = getNext()
-               ctx.collectWithTimestamp(next, next.eventTimestamp)
-
-               if (next.hasWatermarkTime) {
-                       ctx.emitWatermark(new Watermark(next.getWatermarkTime))
-               }
-       }
-}
-{% endhighlight %}
-</div>
-</div>
-
-*Note:* If the streaming program uses a TimestampAssigner on a stream where 
elements have a timestamp already,
-those timestamps will be overwritten by the TimestampAssigner. Similarly, 
Watermarks will be overwritten as well.
-
-
-### Timestamp Assigners / Watermark Generators
-
-Timestamp Assigners take a stream and produce a new stream with timestamped 
elements and watermarks. If the
-original stream had timestamps and/or watermarks already, the timestamp 
assigner overwrites them.
-
-The timestamp assigners usually are specified immediately after the data 
source but it is not strictly required to do so.
-A common pattern is, for example, to parse (*MapFunction*) and filter 
(*FilterFunction*) before the timestamp assigner.
-In any case, the timestamp assigner needs to be specified before the first 
operation on event time
-(such as the first window operation). As a special case, when using Kafka as 
the source of a streaming job,
-Flink allows the specification of a timestamp assigner / watermark emitter 
inside
-the source (or consumer) itself. More information on how to do so can be found 
in the
-[Kafka Connector documentation]({{ site.baseurl 
}}/apis/streaming/connectors/kafka.html).
-
-
-**NOTE:** The remainder of this section presents the main interfaces a 
programmer has
-to implement in order to create her own timestamp extractors/watermark 
emitters.
-To see the pre-implemented extractors that ship with Flink, please refer to the
-[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html) page.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-DataStream<MyEvent> stream = env.readFile(
-        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
-        FilePathFilter.createDefaultFilter(), typeInfo);
-
-DataStream<MyEvent> withTimestampsAndWatermarks = stream
-        .filter( event -> event.severity() == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
-
-withTimestampsAndWatermarks
-        .keyBy( (event) -> event.getGroup() )
-        .timeWindow(Time.seconds(10))
-        .reduce( (a, b) -> a.add(b) )
-        .addSink(...);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val stream: DataStream[MyEvent] = env.readFile(
-         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
-         FilePathFilter.createDefaultFilter());
-
-val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
-        .filter( _.severity == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
-
-withTimestampsAndWatermarks
-        .keyBy( _.getGroup )
-        .timeWindow(Time.seconds(10))
-        .reduce( (a, b) => a.add(b) )
-        .addSink(...)
-{% endhighlight %}
-</div>
-</div>
-
-
-#### **With Periodic Watermarks**
-
-The `AssignerWithPeriodicWatermarks` assigns timestamps and generates 
watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
-
-The interval (every *n* milliseconds) in which the watermark will be generated 
is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. Each time, the assigner's 
`getCurrentWatermark()` method will be
-called, and a new Watermark will be emitted, if the returned Watermark is 
non-null and larger than the previous
-Watermark.
-
-Two simple examples of timestamp assigners with periodic watermark generation 
are below.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-/**
- * This generator generates watermarks assuming that elements come out of 
order to a certain degree only.
- * The latest elements for a certain timestamp t will arrive at most n 
milliseconds after the earliest
- * elements for timestamp t.
- */
-public class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermarks<MyEvent> {
-
-    private final long maxOutOfOrderness = 3500; // 3.5 seconds
-
-    private long currentMaxTimestamp;
-
-    @Override
-    public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-        long timestamp = element.getCreationTime();
-        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
-        return timestamp;
-    }
-
-    @Override
-    public Watermark getCurrentWatermark() {
-        // return the watermark as current highest timestamp minus the 
out-of-orderness bound
-        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
-    }
-}
-
-/**
- * This generator generates watermarks that are lagging behind processing time 
by a certain amount.
- * It assumes that elements arrive in Flink after at most a certain time.
- */
-public class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks<MyEvent> {
-
-       private final long maxTimeLag = 5000; // 5 seconds
-
-       @Override
-       public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-               return element.getCreationTime();
-       }
-
-       @Override
-       public Watermark getCurrentWatermark() {
-               // return the watermark as current time minus the maximum time 
lag
-               return new Watermark(System.currentTimeMillis() - maxTimeLag);
-       }
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-/**
- * This generator generates watermarks assuming that elements come out of 
order to a certain degree only.
- * The latest elements for a certain timestamp t will arrive at most n 
milliseconds after the earliest
- * elements for timestamp t.
- */
-class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent] {
-
-    val maxOutOfOrderness = 3500L; // 3.5 seconds
-
-    var currentMaxTimestamp: Long;
-
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
-        val timestamp = element.getCreationTime()
-        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
-        timestamp;
-    }
-
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current highest timestamp minus the 
out-of-orderness bound
-        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
-    }
-}
-
-/**
- * This generator generates watermarks that are lagging behind processing time 
by a certain amount.
- * It assumes that elements arrive in Flink after at most a certain time.
- */
-class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent] {
-
-    val maxTimeLag = 5000L; // 5 seconds
-
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
-        element.getCreationTime
-    }
-
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
-    }
-}
-{% endhighlight %}
-</div>
-</div>
-
-#### **With Punctuated Watermarks**
-
-To generate Watermarks whenever a certain event indicates that a new watermark 
can be generated, use the
-`AssignerWithPunctuatedWatermarks`. For this class, Flink will first call the 
`extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call for that element 
the
-`checkAndGetNextWatermark(...)` method.
-
-The `checkAndGetNextWatermark(...)` method gets the timestamp that was 
assigned in the `extractTimestamp(...)`
-method, and can decide whether it wants to generate a Watermark. Whenever the 
`checkAndGetNextWatermark(...)`
-method returns a non-null Watermark, and that Watermark is larger than the 
latest previous Watermark, that
-new Watermark will be emitted.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class PunctuatedAssigner extends 
AssignerWithPunctuatedWatermarks<MyEvent> {
-
-       @Override
-       public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-               return element.getCreationTime();
-       }
-
-       @Override
-       public Watermark checkAndGetNextWatermark(MyEvent lastElement, long 
extractedTimestamp) {
-               return element.hasWatermarkMarker() ? new 
Watermark(extractedTimestamp) : null;
-       }
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
-
-       override def extractTimestamp(element: MyEvent, 
previousElementTimestamp: Long): Long = {
-               element.getCreationTime
-       }
-
-       override def checkAndGetNextWatermark(lastElement: MyEvent, 
extractedTimestamp: Long): Watermark = {
-               if (element.hasWatermarkMarker()) new 
Watermark(extractedTimestamp) else null
-       }
-}
-{% endhighlight %}
-</div>
-</div>
-
-*Note:* It is possible to generate a watermark on every single event. However, 
because each watermark causes some
-computation downstream, an excessive number of watermarks slows down 
performance.
-

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md 
b/docs/apis/streaming/fault_tolerance.md
deleted file mode 100644
index 99221e5..0000000
--- a/docs/apis/streaming/fault_tolerance.md
+++ /dev/null
@@ -1,462 +0,0 @@
----
-title: "Fault Tolerance"
-is_beta: false
-
-sub-nav-group: streaming
-sub-nav-id: fault_tolerance
-sub-nav-pos: 5
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink's fault tolerance mechanism recovers programs in the presence of 
failures and
-continues to execute them. Such failures include machine hardware failures, 
network failures,
-transient program failures, etc.
-
-* This will be replaced by the TOC
-{:toc}
-
-
-Streaming Fault Tolerance
--------------------------
-
-Flink has a checkpointing mechanism that recovers streaming jobs after 
failures. The checkpointing mechanism requires a *persistent* (or *durable*) 
source that
-can be asked for prior records again (Apache Kafka is a good example of such a 
source).
-
-The checkpointing mechanism stores the progress in the data sources and data 
sinks, the state of windows, as well as the user-defined state (see [Working 
with State](state.html)) consistently to provide *exactly once* processing 
semantics. Where the checkpoints are stored (e.g., JobManager memory, file 
system, database) depends on the configured [state 
backend](state_backends.html).
-
-The [docs on streaming fault tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html) describe in detail the technique behind 
Flink's streaming fault tolerance mechanism.
-
-To enable checkpointing, call `enableCheckpointing(n)` on the 
`StreamExecutionEnvironment`, where *n* is the checkpoint interval in 
milliseconds.
-
-Other parameters for checkpointing include:
-
-- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how 
many times the job is restarted after a failure.
-  When checkpointing is activated, but this value is not explicitly set, the 
job is restarted infinitely often.
-
-- *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
-  Exactly-once is preferrable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
-
-- *number of concurrent checkpoints*: By default, the system will not trigger 
another checkpoint while one is still in progress. This ensures that the 
topology does not spend too much time on checkpoints and not make progress with 
processing the streams. It is possible to allow for multiple overlapping 
checkpoints, which is interesting for pipelines that have a certain processing 
delay (for example because the functions call external services that need some 
time to respond) but that still want to do very frequent checkpoints (100s of 
milliseconds) to re-process very little upon failures.
-
-- *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000);
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig().setCheckpointTimeout(60000);
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000)
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig.setCheckpointTimeout(60000)
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### Fault Tolerance Guarantees of Data Sources and Sinks
-
-Flink can guarantee exactly-once state updates to user-defined state only when 
the source participates in the
-snapshotting mechanism. The following table lists the state update guarantees 
of Flink coupled with the bundled connectors.
-
-Please read the documentation of each connector to understand the details of 
the fault tolerance guarantees.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Source</th>
-      <th class="text-left" style="width: 25%">Guarantees</th>
-      <th class="text-left">Notes</th>
-    </tr>
-   </thead>
-   <tbody>
-        <tr>
-            <td>Apache Kafka</td>
-            <td>exactly once</td>
-            <td>Use the appropriate Kafka connector for your version</td>
-        </tr>
-        <tr>
-            <td>AWS Kinesis Streams</td>
-            <td>exactly once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>RabbitMQ</td>
-            <td>at most once (v 0.10) / exactly once (v 1.0) </td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Twitter Streaming API</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Collections</td>
-            <td>exactly once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Files</td>
-            <td>exactly once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Sockets</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-  </tbody>
-</table>
-
-To guarantee end-to-end exactly-once record delivery (in addition to 
exactly-once state semantics), the data sink needs
-to take part in the checkpointing mechanism. The following table lists the 
delivery guarantees (assuming exactly-once
-state updates) of Flink coupled with bundled sinks:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Sink</th>
-      <th class="text-left" style="width: 25%">Guarantees</th>
-      <th class="text-left">Notes</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>HDFS rolling sink</td>
-        <td>exactly once</td>
-        <td>Implementation depends on Hadoop version</td>
-    </tr>
-    <tr>
-        <td>Elasticsearch</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Kafka producer</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Cassandra sink</td>
-        <td>at least once / exactly once</td>
-        <td>exactly once only for idempotent updates</td>
-    </tr>
-    <tr>
-        <td>AWS Kinesis Streams</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>File sinks</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Socket sinks</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Standard output</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Redis sink</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-  </tbody>
-</table>
-
-{% top %}
-
-## Restart Strategies
-
-Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure.
-The cluster can be started with a default restart strategy which is always 
used when no job specific restart strategy has been defined.
-In case that the job is submitted with a restart strategy, this strategy 
overrides the cluster's default setting.
- 
-The default restart strategy is set via Flink's configuration file 
`flink-conf.yaml`.
-The configuration parameter *restart-strategy* defines which strategy is taken.
-Per default, the no-restart strategy is used.
-See the following list of available restart strategies to learn what values 
are supported.
-
-Each restart strategy comes with its own set of parameters which control its 
behaviour.
-These values are also set in the configuration file.
-The description of each restart strategy contains more information about the 
respective configuration values.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 50%">Restart Strategy</th>
-      <th class="text-left">Value for restart-strategy</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Fixed delay</td>
-        <td>fixed-delay</td>
-    </tr>
-    <tr>
-        <td>Failure rate</td>
-        <td>failure-rate</td>
-    </tr>
-    <tr>
-        <td>No restart</td>
-        <td>none</td>
-    </tr>
-  </tbody>
-</table>
-
-Apart from defining a default restart strategy, it is possible to define for 
each Flink job a specific restart strategy.
-This restart strategy is set programmatically by calling the 
`setRestartStrategy` method on the `ExecutionEnvironment`.
-Note that this also works for the `StreamExecutionEnvironment`.
-
-The following example shows how we can set a fixed delay restart strategy for 
our job.
-In case of a failure the system tries to restart the job 3 times and waits 10 
seconds in-between successive restart attempts.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### Fixed Delay Restart Strategy
-
-The fixed delay restart strategy attempts a given number of times to restart 
the job.
-If the maximum number of attempts is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a 
fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration 
parameter in `flink-conf.yaml`.
-
-~~~
-restart-strategy: fixed-delay
-~~~
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><it>restart-strategy.fixed-delay.attempts</it></td>
-        <td>Number of restart attempts</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.fixed-delay.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-~~~
-restart-strategy.fixed-delay.attempts: 3
-restart-strategy.fixed-delay.delay: 10 s
-~~~
-
-The fixed delay restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-#### Restart Attempts
-
-The number of times that Flink retries the execution before the job is 
declared as failed is configurable via the 
*restart-strategy.fixed-delay.attempts* parameter.
-
-The default value is **1**.
-
-#### Retry Delays
-
-Execution retries can be configured to be delayed. Delaying the retry means 
that after a failed execution, the re-execution does not start immediately, but 
only after a certain delay.
-
-Delaying the retries can be helpful when the program interacts with external 
systems where for example connections or pending transactions should reach a 
timeout before re-execution is attempted.
-
-The default value is the value of *akka.ask.timeout*.
-
-{% top %}
-
-### Failure Rate Restart Strategy
-
-The failure rate restart strategy restarts job after failure, but when 
`failure rate` (failures per time interval) is exceeded, the job eventually 
fails.
-In-between two consecutive restart attempts, the restart strategy waits a 
fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration 
parameter in `flink-conf.yaml`.
-
-~~~
-restart-strategy: failure-rate
-~~~
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        
<td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
-        <td>Maximum number of restarts in given time interval before failing a 
job</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
-        <td>Time interval for measuring failure rate.</td>
-        <td>1 minute</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-~~~
-restart-strategy.failure-rate.max-failures-per-interval: 3
-restart-strategy.failure-rate.failure-rate-interval: 5 min
-restart-strategy.failure-rate.delay: 10 s
-~~~
-
-The failure rate restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per interval
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per unit
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### No Restart Strategy
-
-The job fails directly and no restart is attempted.
-
-~~~
-restart-strategy: none
-~~~
-
-The no restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.noRestart());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.noRestart())
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}

Reply via email to