Repository: flink
Updated Branches:
  refs/heads/master 6a456c673 -> 3505316c6


[FLINK-4480] [elasticsearch connector] Fix link to elastic.co in documentation

This also includes minor code and test cleanups.

This closes #2416


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3505316c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3505316c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3505316c

Branch: refs/heads/master
Commit: 3505316c6446606555f4ae7cc18ba5e872ff687d
Parents: 53f5a8c
Author: smarthi <[email protected]>
Authored: Wed Aug 24 18:07:23 2016 -0400
Committer: Stephan Ewen <[email protected]>
Committed: Fri Aug 26 17:53:19 2016 +0200

----------------------------------------------------------------------
 docs/dev/connectors/index.md                             |  2 +-
 .../elasticsearch/examples/ElasticsearchExample.java     |  3 +--
 .../flink-connector-elasticsearch2/pom.xml               |  2 +-
 .../elasticsearch2/examples/ElasticsearchExample.java    | 11 +++++++++--
 4 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3505316c/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index c49c8c2..59b5e7b 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -31,7 +31,7 @@ Currently these systems are supported:
 
  * [Apache Kafka](https://kafka.apache.org/) (sink/source)
  * [Elasticsearch](https://elastic.co/) (sink)
- * [Elasticsearch 2x](https://elastic.com) (sink)
+ * [Elasticsearch 2x](https://elastic.co/) (sink)
  * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
  * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
  * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) 
(sink/source)

http://git-wip-us.apache.org/repos/asf/flink/blob/3505316c/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
 
b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
index f8f658b..136ae77 100644
--- 
a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
+++ 
b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch.examples;
 
-import com.google.common.collect.Maps;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -58,7 +57,7 @@ public class ElasticsearchExample {
                        }
                });
 
-               Map<String, String> config = Maps.newHashMap();
+               Map<String, String> config = new HashMap<>();
                // This instructs the sink to emit after every element, 
otherwise they would be buffered
                config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3505316c/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml 
b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
index cb8ad62..51f55b3 100644
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
        <!-- Allow users to pass custom connector versions -->
        <properties>
-               <elasticsearch.version>2.2.1</elasticsearch.version>
+               <elasticsearch.version>2.3.5</elasticsearch.version>
        </properties>
 
        <dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/3505316c/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
 
b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
index 77520b5..05760e8 100644
--- 
a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
+++ 
b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
@@ -26,7 +26,11 @@ import 
org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -60,7 +64,10 @@ public class ElasticsearchExample {
                // This instructs the sink to emit after every element, 
otherwise they would be buffered
                config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
 
-               source.addSink(new ElasticsearchSink<>(config, null, new 
ElasticsearchSinkFunction<String>(){
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(config, transports, new 
ElasticsearchSinkFunction<String>(){
                        @Override
                        public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
                                indexer.add(createIndexRequest(element));
@@ -70,7 +77,7 @@ public class ElasticsearchExample {
                env.execute("Elasticsearch Example");
        }
 
-       public static IndexRequest createIndexRequest(String element) {
+       private static IndexRequest createIndexRequest(String element) {
                Map<String, Object> json = new HashMap<>();
                json.put("data", element);
 

Reply via email to