http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml 
b/flink-connectors/flink-connector-elasticsearch2/pom.xml
new file mode 100644
index 0000000..5fcb05e
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
+       <name>flink-connector-elasticsearch2</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <elasticsearch.version>2.3.5</elasticsearch.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+ 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.elasticsearch</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-core</artifactId>
+               </dependency>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
new file mode 100644
index 0000000..650931f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+public class BulkProcessorIndexer implements RequestIndexer {
+       private final BulkProcessor bulkProcessor;
+
+       public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
+               this.bulkProcessor = bulkProcessor;
+       }
+
+       @Override
+       public void add(ActionRequest... actionRequests) {
+               for (ActionRequest actionRequest : actionRequests) {
+                       this.bulkProcessor.add(actionRequest);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
new file mode 100644
index 0000000..e839589
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ *
+ * <p>
+ * When using the second constructor
+ * {@link #ElasticsearchSink(java.util.Map, java.util.List, 
ElasticsearchSinkFunction)} a {@link TransportClient} will
+ * be used.
+ *
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will 
fail if no cluster
+ * can be connected to.
+ *
+ * <p>
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch 
when creating
+ * {@link TransportClient}. The config keys can be found in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this should be 
set to the name
+ * of the cluster that the sink should emit to.
+ *
+ * <p>
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>
+ * You also have to provide an {@link RequestIndexer}. This is used to create 
an
+ * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T>  {
+
+       public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+       public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+       public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+       /**
+        * The user specified config map that we forward to Elasticsearch when 
we create the Client.
+        */
+       private final Map<String, String> userConfig;
+
+       /**
+        * The list of nodes that the TransportClient should connect to. This 
is null if we are using
+        * an embedded Node to get a Client.
+        */
+       private final List<InetSocketAddress> transportAddresses;
+
+       /**
+        * The builder that is used to construct an {@link IndexRequest} from 
the incoming element.
+        */
+       private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+       /**
+        * The Client that was either retrieved from a Node or is a 
TransportClient.
+        */
+       private transient Client client;
+
+       /**
+        * Bulk processor that was created using the client
+        */
+       private transient BulkProcessor bulkProcessor;
+
+       /**
+        * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
+        */
+       private transient RequestIndexer requestIndexer;
+
+       /**
+        * This is set from inside the BulkProcessor listener if there where 
failures in processing.
+        */
+       private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+       /**
+        * This is set from inside the BulkProcessor listener if a Throwable 
was thrown during processing.
+        */
+       private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
+
+       /**
+        * Creates a new ElasticsearchSink that connects to the cluster using a 
TransportClient.
+        *
+        * @param userConfig The map of user settings that are passed when 
constructing the TransportClient and BulkProcessor
+        * @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
+        * @param elasticsearchSinkFunction This is used to generate the 
ActionRequest from the incoming element
+        *
+        */
+       public ElasticsearchSink(Map<String, String> userConfig, 
List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
+               this.userConfig = userConfig;
+               this.elasticsearchSinkFunction = elasticsearchSinkFunction;
+               Preconditions.checkArgument(transportAddresses != null && 
transportAddresses.size() > 0);
+               this.transportAddresses = transportAddresses;
+       }
+
+       /**
+        * Initializes the connection to Elasticsearch by creating a
+        * {@link org.elasticsearch.client.transport.TransportClient}.
+        */
+       @Override
+       public void open(Configuration configuration) {
+               List<TransportAddress> transportNodes;
+               transportNodes = new ArrayList<>(transportAddresses.size());
+               for (InetSocketAddress address : transportAddresses) {
+                       transportNodes.add(new 
InetSocketTransportAddress(address));
+               }
+
+               Settings settings = 
Settings.settingsBuilder().put(userConfig).build();
+
+               TransportClient transportClient = 
TransportClient.builder().settings(settings).build();
+               for (TransportAddress transport: transportNodes) {
+                       transportClient.addTransportAddress(transport);
+               }
+
+               // verify that we actually are connected to a cluster
+               ImmutableList<DiscoveryNode> nodes = 
ImmutableList.copyOf(transportClient.connectedNodes());
+               if (nodes.isEmpty()) {
+                       throw new RuntimeException("Client is not connected to 
any Elasticsearch nodes!");
+               }
+
+               client = transportClient;
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Created Elasticsearch TransportClient {}", 
client);
+               }
+
+               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(client, new BulkProcessor.Listener() {
+                       @Override
+                       public void beforeBulk(long executionId, BulkRequest 
request) {
+
+                       }
+
+                       @Override
+                       public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+                               if (response.hasFailures()) {
+                                       for (BulkItemResponse itemResp : 
response.getItems()) {
+                                               if (itemResp.isFailed()) {
+                                                       LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
+                                                       
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+                                               }
+                                       }
+                                       hasFailure.set(true);
+                               }
+                       }
+
+                       @Override
+                       public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
+                               LOG.error(failure.getMessage());
+                               failureThrowable.compareAndSet(null, failure);
+                               hasFailure.set(true);
+                       }
+               });
+
+               // This makes flush() blocking
+               bulkProcessorBuilder.setConcurrentRequests(0);
+
+               ParameterTool params = ParameterTool.fromMap(userConfig);
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+                       
bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+               }
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+                       bulkProcessorBuilder.setBulkSize(new 
ByteSizeValue(params.getInt(
+                                       CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), 
ByteSizeUnit.MB));
+               }
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+                       
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
+               }
+
+               bulkProcessor = bulkProcessorBuilder.build();
+               requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+       }
+
+       @Override
+       public void invoke(T element) {
+               elasticsearchSinkFunction.process(element, getRuntimeContext(), 
requestIndexer);
+       }
+
+       @Override
+       public void close() {
+               if (bulkProcessor != null) {
+                       bulkProcessor.close();
+                       bulkProcessor = null;
+               }
+
+               if (client != null) {
+                       client.close();
+               }
+
+               if (hasFailure.get()) {
+                       Throwable cause = failureThrowable.get();
+                       if (cause != null) {
+                               throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
+                       } else {
+                               throw new RuntimeException("An error occured in 
ElasticsearchSink.");
+                       }
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
new file mode 100644
index 0000000..55ba720
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * Method that creates an {@link org.elasticsearch.action.ActionRequest} from 
an element in a Stream.
+ *
+ * <p>
+ * This is used by {@link ElasticsearchSink} to prepare elements for sending 
them to Elasticsearch.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *                                     private static class 
TestElasticSearchSinkFunction implements
+ *                                             
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ *
+ *                                     public IndexRequest 
createIndexRequest(Tuple2<Integer, String> element) {
+ *                                             Map<String, Object> json = new 
HashMap<>();
+ *                                             json.put("data", element.f1);
+ *
+ *                                             return Requests.indexRequest()
+ *                                                     .index("my-index")
+ *                                                     .type("my-type")
+ *                                                     
.id(element.f0.toString())
+ *                                                     .source(json);
+ *                                             }
+ *
+ *                             public void process(Tuple2<Integer, String> 
element, RuntimeContext ctx, RequestIndexer indexer) {
+ *                                     
indexer.add(createIndexRequest(element));
+ *                             }
+ *             }
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code 
ElasticsearchSinkFunction}
+ */
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+       void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
new file mode 100644
index 0000000..144a87b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+public interface RequestIndexer extends Serializable {
+       void add(ActionRequest... actionRequests);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..bc9bedc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase 
{
+
+       private static final int NUM_ELEMENTS = 20;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testTransportClient() throws Exception {
+
+               File dataDir = tempFolder.newFolder();
+
+               Node node = NodeBuilder.nodeBuilder()
+                               .settings(Settings.settingsBuilder()
+                                               .put("path.home", 
dataDir.getParent())
+                                               .put("http.enabled", false)
+                                               .put("path.data", 
dataDir.getAbsolutePath()))
+                               // set a custom cluster name to verify that 
user config works correctly
+                               .clusterName("my-transport-client-cluster")
+                               .node();
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> config = new HashMap<>();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+               config.put("cluster.name", "my-transport-client-cluster");
+
+               // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
+
+               env.execute("Elasticsearch TransportClient Test");
+
+               // verify the results
+               Client client = node.client();
+               for (int i = 0; i < NUM_ELEMENTS; i++) {
+                       GetResponse response = client.get(new 
GetRequest("my-index",
+                                       "my-type", 
Integer.toString(i))).actionGet();
+                       Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
+               }
+
+               node.close();
+       }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullTransportClient() throws Exception {
+
+       File dataDir = tempFolder.newFolder();
+
+       Node node = NodeBuilder.nodeBuilder()
+               .settings(Settings.settingsBuilder()
+                       .put("path.home", dataDir.getParent())
+                       .put("http.enabled", false)
+                       .put("path.data", dataDir.getAbsolutePath()))
+               // set a custom cluster name to verify that user config works 
correctly
+               .clusterName("my-transport-client-cluster")
+               .node();
+
+       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+       DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
+
+       Map<String, String> config = new HashMap<>();
+       // This instructs the sink to emit after every element, otherwise they 
would be buffered
+       config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+       config.put("cluster.name", "my-transport-client-cluster");
+
+       source.addSink(new ElasticsearchSink<>(config, null, new 
TestElasticsearchSinkFunction()));
+
+       env.execute("Elasticsearch TransportClient Test");
+
+       // verify the results
+       Client client = node.client();
+       for (int i = 0; i < NUM_ELEMENTS; i++) {
+        GetResponse response = client.get(new GetRequest("my-index",
+                "my-type", Integer.toString(i))).actionGet();
+        Assert.assertEquals("message #" + i, response.getSource().get("data"));
+       }
+
+       node.close();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyTransportClient() throws Exception {
+
+       File dataDir = tempFolder.newFolder();
+
+       Node node = NodeBuilder.nodeBuilder()
+               .settings(Settings.settingsBuilder()
+                       .put("path.home", dataDir.getParent())
+                       .put("http.enabled", false)
+                       .put("path.data", dataDir.getAbsolutePath()))
+               // set a custom cluster name to verify that user config works 
correctly
+               .clusterName("my-transport-client-cluster")
+               .node();
+
+       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+       DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
+
+       Map<String, String> config = new HashMap<>();
+       // This instructs the sink to emit after every element, otherwise they 
would be buffered
+       config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+       config.put("cluster.name", "my-transport-client-cluster");
+
+       source.addSink(new ElasticsearchSink<>(config, new 
ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
+
+       env.execute("Elasticsearch TransportClient Test");
+
+       // verify the results
+       Client client = node.client();
+       for (int i = 0; i < NUM_ELEMENTS; i++) {
+        GetResponse response = client.get(new GetRequest("my-index",
+                "my-type", Integer.toString(i))).actionGet();
+        Assert.assertEquals("message #" + i, response.getSource().get("data"));
+       }
+
+       node.close();
+ }
+
+       @Test(expected = JobExecutionException.class)
+       public void testTransportClientFails() throws Exception{
+               // this checks whether the TransportClient fails early when 
there is no cluster to
+               // connect to. There isn't a similar test for the Node Client 
version since that
+               // one will block and wait for a cluster to come online
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> config = new HashMap<>();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+               config.put("cluster.name", "my-node-client-cluster");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
+
+               env.execute("Elasticsearch Node Client Test");
+       }
+
+       private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(Tuple2.of(i, "message #" + i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class TestElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+               private static final long serialVersionUID = 1L;
+
+               public IndexRequest createIndexRequest(Tuple2<Integer, String> 
element) {
+                       Map<String, Object> json = new HashMap<>();
+                       json.put("data", element.f1);
+
+                       return Requests.indexRequest()
+                                       .index("my-index")
+                                       .type("my-type")
+                                       .id(element.f0.toString())
+                                       .source(json);
+               }
+
+               @Override
+               public void process(Tuple2<Integer, String> element, 
RuntimeContext ctx, RequestIndexer indexer) {
+                       indexer.add(createIndexRequest(element));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
new file mode 100644
index 0000000..05760e8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SingleOutputStreamOperator<String> source =
+                               env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                                       /**
+                                        * The mapping method. Takes an element 
from the input data set and transforms
+                                        * it into exactly one element.
+                                        *
+                                        * @param value The input value.
+                                        * @return The transformed value
+                                        * @throws Exception This method may 
throw exceptions. Throwing an exception will cause the operation
+                                        *                   to fail and may 
trigger recovery.
+                                        */
+                                       @Override
+                                       public String map(Long value) throws 
Exception {
+                                               return "message #" + value;
+                                       }
+                               });
+
+               Map<String, String> config = new HashMap<>();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(config, transports, new 
ElasticsearchSinkFunction<String>(){
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element));
+                       }
+               }));
+
+               env.execute("Elasticsearch Example");
+       }
+
+       private static IndexRequest createIndexRequest(String element) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                               .index("my-index")
+                               .type("my-type")
+                               .id(element)
+                               .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..dc20726
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..7a077c2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming.connectors.elasticsearch2" 
level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/pom.xml 
b/flink-connectors/flink-connector-filesystem/pom.xml
new file mode 100644
index 0000000..fbc830a
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-filesystem_2.10</artifactId>
+       <name>flink-connector-filesystem</name>
+
+       <packaging>jar</packaging>
+
+       <!--
+               This is a Hadoop2 only flink module.
+       -->
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+                       
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+                       
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+
+                       <!--
+                               
https://issues.apache.org/jira/browse/DIRSHARED-134
+                               Required to pull the Mini-KDC transitive 
dependency
+                       -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!--
+                                       Enforce single threaded execution to 
avoid port conflicts when running
+                                       secure mini DFS cluster
+                                       -->
+                                       <forkCount>1</forkCount>
+                                       <reuseForks>false</reuseForks>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
new file mode 100644
index 0000000..3e3c86b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -0,0 +1,309 @@
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+* Implementation of AvroKeyValue writer that can be used in Sink.
+* Each entry would be wrapped in GenericRecord with key/value fields(same as 
in m/r lib)
+<pre>
+Usage:
+{@code
+               BucketingSink<Tuple2<Long, Long>> sink = new 
BucketingSink<Tuple2<Long, Long>>("/tmp/path");
+               sink.setBucketer(new DateTimeBucketer<Tuple2<Long, 
Long>>("yyyy-MM-dd/HH/mm/"));
+               sink.setPendingSuffix(".avro");
+               Map<String,String> properties = new HashMap<>();
+               Schema longSchema = Schema.create(Type.LONG);
+               String keySchema = longSchema.toString();
+               String valueSchema = longSchema.toString();
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema);
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema);
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
Boolean.toString(true));
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
+               
+               sink.setWriter(new AvroSinkWriter<Long, Long>(properties));
+               sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
+}
+</pre>
+*/
+public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, 
V>>  implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
+       private static final long serialVersionUID = 1L;
+       public static final String CONF_OUTPUT_KEY_SCHEMA = 
"avro.schema.output.key";
+       public static final String CONF_OUTPUT_VALUE_SCHEMA = 
"avro.schema.output.value";
+       public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
+       public static final String CONF_COMPRESS_CODEC = 
FileOutputFormat.COMPRESS_CODEC;
+       public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
+       public static final String CONF_XZ_LEVEL = "avro.xz.level";
+
+       private transient AvroKeyValueWriter<K, V> keyValueWriter;
+
+       private final Map<String, String> properties;
+
+       /**
+        * C'tor for the writer
+        * <p>
+        * You can provide different properties that will be used to configure 
avro key-value writer as simple properties map(see example above)
+        * @param properties
+        */
+       @SuppressWarnings("deprecation")
+       public AvroKeyValueSinkWriter(Map<String, String> properties) {
+               this.properties = properties;
+               
+               String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
+               if (keySchemaString == null) {
+                       throw new IllegalStateException("No key schema 
provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
+               }
+               Schema.parse(keySchemaString);//verifying that schema valid
+               
+               String valueSchemaString = 
properties.get(CONF_OUTPUT_VALUE_SCHEMA);
+               if (valueSchemaString == null) {
+                       throw new IllegalStateException("No value schema 
provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
+               }
+               Schema.parse(valueSchemaString);//verifying that schema valid
+       }
+
+       private boolean getBoolean(Map<String,String> conf, String key, boolean 
def) {
+               String value = conf.get(key);
+               if (value == null) {
+                       return def;
+               }
+               return Boolean.parseBoolean(value);
+       }
+       
+       private int getInt(Map<String,String> conf, String key, int def) {
+               String value = conf.get(key);
+               if (value == null) {
+                       return def;
+               }
+               return Integer.parseInt(value);
+       }
+
+       //this derived from AvroOutputFormatBase.getCompressionCodec(..)
+       private CodecFactory getCompressionCodec(Map<String,String> conf) {
+               if (getBoolean(conf, CONF_COMPRESS, false)) {
+                       int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, 
CodecFactory.DEFAULT_DEFLATE_LEVEL);
+                       int xzLevel = getInt(conf, CONF_XZ_LEVEL, 
CodecFactory.DEFAULT_XZ_LEVEL);
+
+                       String outputCodec = conf.get(CONF_COMPRESS_CODEC);
+
+                       if 
(DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
+                               return CodecFactory.deflateCodec(deflateLevel);
+                       } else if 
(DataFileConstants.XZ_CODEC.equals(outputCodec)) {
+                               return CodecFactory.xzCodec(xzLevel);
+                       } else {
+                               return CodecFactory.fromString(outputCodec);
+                       }
+               }
+               return CodecFactory.nullCodec();
+       }
+
+       @Override
+       @SuppressWarnings("deprecation")
+       public void open(FileSystem fs, Path path) throws IOException {
+               super.open(fs, path);
+
+               CodecFactory compressionCodec = getCompressionCodec(properties);
+               Schema keySchema = 
Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
+               Schema valueSchema = 
Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
+               keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, 
valueSchema, compressionCodec, getStream());
+       }
+
+       @Override
+       public void close() throws IOException {
+               super.close();//the order is important since super.close 
flushes inside
+               if (keyValueWriter != null) {
+                       keyValueWriter.close();
+               }
+       }
+       
+       @Override
+       public long flush() throws IOException {
+               if (keyValueWriter != null) {
+                       keyValueWriter.sync();
+               }
+               return super.flush();
+       }
+
+       @Override
+       public void write(Tuple2<K, V> element) throws IOException {
+               getStream(); // Throws if the stream is not open
+               keyValueWriter.write(element.f0, element.f1);
+       }
+
+       @Override
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               if (!type.isTupleType()) {
+                       throw new IllegalArgumentException("Input 
TypeInformation is not a tuple type.");
+               }
+
+               TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
+
+               if (tupleType.getArity() != 2) {
+                       throw new IllegalArgumentException("Input 
TypeInformation must be a Tuple2 type.");
+               }
+       }
+
+       @Override
+       public Writer<Tuple2<K, V>> duplicate() {
+               return new AvroKeyValueSinkWriter<K, V>(properties);
+       }
+       
+       // taken from m/r avro lib to remove dependency on it
+       private static final class AvroKeyValueWriter<K, V> {
+               /** A writer for the Avro container file. */
+               private final DataFileWriter<GenericRecord> mAvroFileWriter;
+
+               /**
+                * The writer schema for the generic record entries of the Avro
+                * container file.
+                */
+               private final Schema mKeyValuePairSchema;
+
+               /**
+                * A reusable Avro generic record for writing key/value pairs 
to the
+                * file.
+                */
+               private final AvroKeyValue<Object, Object> mOutputRecord;
+
+               AvroKeyValueWriter(Schema keySchema, Schema valueSchema,
+                               CodecFactory compressionCodec, OutputStream 
outputStream,
+                               int syncInterval) throws IOException {
+                       // Create the generic record schema for the key/value 
pair.
+                       mKeyValuePairSchema = AvroKeyValue
+                                       .getSchema(keySchema, valueSchema);
+
+                       // Create an Avro container file and a writer to it.
+                       DatumWriter<GenericRecord> genericDatumWriter = new 
GenericDatumWriter<GenericRecord>(
+                                       mKeyValuePairSchema);
+                       mAvroFileWriter = new DataFileWriter<GenericRecord>(
+                                       genericDatumWriter);
+                       mAvroFileWriter.setCodec(compressionCodec);
+                       mAvroFileWriter.setSyncInterval(syncInterval);
+                       mAvroFileWriter.create(mKeyValuePairSchema, 
outputStream);
+
+                       // Create a reusable output record.
+                       mOutputRecord = new AvroKeyValue<Object, Object>(
+                                       new 
GenericData.Record(mKeyValuePairSchema));
+               }
+
+               AvroKeyValueWriter(Schema keySchema, Schema valueSchema,
+                               CodecFactory compressionCodec, OutputStream 
outputStream)
+                               throws IOException {
+                       this(keySchema, valueSchema, compressionCodec, 
outputStream,
+                                       
DataFileConstants.DEFAULT_SYNC_INTERVAL);
+               }
+
+               void write(K key, V value) throws IOException {
+                       mOutputRecord.setKey(key);
+                       mOutputRecord.setValue(value);
+                       mAvroFileWriter.append(mOutputRecord.get());
+               }
+
+               void close() throws IOException {
+                       mAvroFileWriter.close();
+               }
+
+               long sync() throws IOException {
+                       return mAvroFileWriter.sync();
+               }
+       }
+
+       // taken from AvroKeyValue avro-mapr lib
+       public static class AvroKeyValue<K, V> {
+               /** The name of the key value pair generic record. */
+               public static final String KEY_VALUE_PAIR_RECORD_NAME = 
"KeyValuePair";
+
+               /** The namespace of the key value pair generic record. */
+               public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = 
"org.apache.avro.mapreduce";
+
+               /** The name of the generic record field containing the key. */
+               public static final String KEY_FIELD = "key";
+
+               /** The name of the generic record field containing the value. 
*/
+               public static final String VALUE_FIELD = "value";
+
+               /** The key/value generic record wrapped by this class. */
+               public final GenericRecord mKeyValueRecord;
+
+               /**
+                * Wraps a GenericRecord that is a key value pair.
+                */
+               public AvroKeyValue(GenericRecord keyValueRecord) {
+                       mKeyValueRecord = keyValueRecord;
+               }
+
+               public GenericRecord get() {
+                       return mKeyValueRecord;
+               }
+
+               public void setKey(K key) {
+                       mKeyValueRecord.put(KEY_FIELD, key);
+               }
+
+               public void setValue(V value) {
+                       mKeyValueRecord.put(VALUE_FIELD, value);
+               }
+
+               @SuppressWarnings("unchecked")
+               public K getKey() {
+                       return (K) mKeyValueRecord.get(KEY_FIELD);
+               }
+
+               @SuppressWarnings("unchecked")
+               public V getValue() {
+                       return (V) mKeyValueRecord.get(VALUE_FIELD);
+               }
+
+               /**
+                * Creates a KeyValuePair generic record schema.
+                * 
+                * @return A schema for a generic record with two fields: 'key' 
and
+                *         'value'.
+                */
+               public static Schema getSchema(Schema keySchema, Schema 
valueSchema) {
+                       Schema schema = 
Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME,
+                                       "A key/value pair", 
KEY_VALUE_PAIR_RECORD_NAMESPACE, false);
+                       schema.setFields(Arrays.asList(new 
Schema.Field(KEY_FIELD,
+                                       keySchema, "The key", null), new 
Schema.Field(VALUE_FIELD,
+                                       valueSchema, "The value", null)));
+                       return schema;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
new file mode 100644
index 0000000..24ad6ab
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A bucketer is used with a {@link RollingSink}
+ * to put emitted elements into rolling files.
+ *
+ * <p>
+ * The {@code RollingSink} has one active bucket that it is writing to at a 
time. Whenever
+ * a new element arrives it will ask the {@code Bucketer} if a new bucket 
should be started and
+ * the old one closed. The {@code Bucketer} can, for example, decide to start 
new buckets
+ * based on system time.
+ *
+ * @deprecated use {@link 
org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead.
+ */
+@Deprecated
+public interface Bucketer extends Serializable {
+
+       /**
+        * Returns {@code true} when a new bucket should be started.
+        *
+        * @param currentBucketPath The bucket {@code Path} that is currently 
being used.
+        */
+       boolean shouldStartNewBucket(Path basePath, Path currentBucketPath);
+
+       /**
+        * Returns the {@link Path} of a new bucket file.
+        *
+        * @param basePath The base path containing all the buckets.
+        *
+        * @return The complete new {@code Path} of the new bucket. This should 
include the {@code basePath}
+        *      and also the {@code subtaskIndex} tp avoid clashes with 
parallel sinks.
+        */
+       Path getNextBucketPath(Path basePath);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
new file mode 100644
index 0000000..174707c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+
+/**
+ * A clock that can provide the current time.
+ *
+ * <p>
+ * Normally this would be system time, but for testing a custom {@code Clock} 
can be provided.
+ */
+public interface Clock {
+
+       /**
+        * Return the current system time in milliseconds.
+        */
+       public long currentTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
new file mode 100644
index 0000000..0df8998
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ * <p>
+ * The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link RollingSink}. The {@code dateTimePath}
+ * is determined based on the current system time and the user provided format 
string.
+ *
+ * <p>
+ * {@link SimpleDateFormat} is used to derive a date string from the current 
system time and
+ * the date format string. The default format string is {@code 
"yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ * @deprecated use {@link 
org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
+ */
+@Deprecated
+public class DateTimeBucketer implements Bucketer {
+
+       private static Logger LOG = 
LoggerFactory.getLogger(DateTimeBucketer.class);
+
+       private static final long serialVersionUID = 1L;
+
+       private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+       // We have this so that we can manually set it for tests.
+       private static Clock clock = new SystemClock();
+
+       private final String formatString;
+
+       private transient SimpleDateFormat dateFormatter;
+
+       /**
+        * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"}.
+        */
+       public DateTimeBucketer() {
+               this(DEFAULT_FORMAT_STRING);
+       }
+
+       /**
+        * Creates a new {@code DateTimeBucketer} with the given date/time 
format string.
+        *
+        * @param formatString The format string that will be given to {@code 
SimpleDateFormat} to determine
+        *                     the bucket path.
+        */
+       public DateTimeBucketer(String formatString) {
+               this.formatString = formatString;
+
+               this.dateFormatter = new SimpleDateFormat(formatString);
+       }
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               this.dateFormatter = new SimpleDateFormat(formatString);
+       }
+
+
+       @Override
+       public boolean shouldStartNewBucket(Path basePath, Path 
currentBucketPath) {
+               String newDateTimeString = dateFormatter.format(new 
Date(clock.currentTimeMillis()));
+               return !(new Path(basePath, 
newDateTimeString).equals(currentBucketPath));
+       }
+
+       @Override
+       public Path getNextBucketPath(Path basePath) {
+               String newDateTimeString = dateFormatter.format(new 
Date(clock.currentTimeMillis()));
+               return new Path(basePath + "/" + newDateTimeString);
+       }
+
+       @Override
+       public String toString() {
+               return "DateTimeBucketer{" +
+                               "formatString='" + formatString + '\'' +
+                               '}';
+       }
+
+       /**
+        * This sets the internal {@link Clock} implementation. This method 
should only be used for testing
+        *
+        * @param newClock The new clock to set.
+        */
+       public static void setClock(Clock newClock) {
+               clock = newClock;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
new file mode 100644
index 0000000..6854596
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link Bucketer} that does not perform any
+ * rolling of files. All files are written to the base path.
+ *
+ * @deprecated use {@link BasePathBucketer} instead.
+ */
+@Deprecated
+public class NonRollingBucketer implements Bucketer {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean shouldStartNewBucket(Path basePath, Path 
currentBucketPath) {
+               return false;
+       }
+
+       @Override
+       public Path getNextBucketPath(Path basePath) {
+               return basePath;
+       }
+
+       @Override
+       public String toString() {
+               return "NonRollingBucketer";
+       }
+}

Reply via email to