[FLINK-4988] Elasticsearch 5.x support
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8699b03d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8699b03d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8699b03d Branch: refs/heads/master Commit: 8699b03d79a441ca33d9f62b96490d29a0efaf44 Parents: b452c8b Author: Mike Dias <[email protected]> Authored: Mon Nov 7 18:09:48 2016 -0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Feb 7 22:45:45 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/elasticsearch5.md | 146 +++++++++++ docs/dev/connectors/filesystem_sink.md | 2 +- docs/dev/connectors/index.md | 1 - docs/dev/connectors/nifi.md | 2 +- docs/dev/connectors/rabbitmq.md | 2 +- docs/dev/connectors/twitter.md | 2 +- .../flink-connector-elasticsearch5/pom.xml | 93 +++++++ .../elasticsearch5/BulkProcessorIndexer.java | 35 +++ .../elasticsearch5/ElasticsearchSink.java | 259 +++++++++++++++++++ .../ElasticsearchSinkFunction.java | 60 +++++ .../elasticsearch5/RequestIndexer.java | 25 ++ .../elasticsearch5/ElasticsearchSinkITCase.java | 200 ++++++++++++++ .../examples/ElasticsearchExample.java | 83 ++++++ .../src/test/resources/log4j2.properties | 27 ++ .../src/test/resources/logback-test.xml | 30 +++ flink-connectors/pom.xml | 3 +- 16 files changed, 964 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/elasticsearch5.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/elasticsearch5.md b/docs/dev/connectors/elasticsearch5.md new file mode 100644 index 0000000..2673d86 --- /dev/null +++ b/docs/dev/connectors/elasticsearch5.md @@ -0,0 +1,146 @@ +--- +title: "Elasticsearch 5.x Connector" +nav-title: Elasticsearch 5.x +nav-parent_id: connectors +nav-pos: 6 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This connector provides a Sink that can write to an +[Elasticsearch 5.x](https://elastic.co/) Index. To use this connector, add the +following dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) +for information about how to package the program with the libraries for +cluster execution. + +#### Installing Elasticsearch 5.x + +Instructions for setting up an Elasticsearch cluster can be found + [here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/setup.html). +Make sure to set and remember a cluster name. This must be set when +creating a Sink for writing to your cluster + +#### Elasticsearch 5.x Sink +The connector provides a Sink that can send data to an Elasticsearch 5.x Index. + +The sink communicates with Elasticsearch via Transport Client + +See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.x/transport-client.html) +for information about the Transport Client. + +The code below shows how to create a sink that uses a `TransportClient` for communication: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +File dataDir = ....; + +DataStream<String> input = ...; + +Map<String, String> esConfig = new HashMap<>(); +esConfig.put("cluster.name", "my-cluster-name"); + +// This instructs the sink to emit after every element, otherwise they would be buffered +Map<String, String> sinkConfig = new HashMap<>(); +sinkConfig.put("bulk.flush.max.actions", "1"); + +List<InetSocketAddress> transports = new ArrayList<>(); +transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); +transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); + +input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() { + public IndexRequest createIndexRequest(String element) { + Map<String, String> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .source(json); + } + + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val dataDir = ....; + +val input: DataStream[String] = ... + +val esConfig = new util.HashMap[String, String] +esConfig.put("cluster.name", "my-cluster-name") + +val sinkConfig = new util.HashMap[String, String] +sinkConfig.put("bulk.flush.max.actions", "1") + +val transports = new ArrayList[String] +transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)) +transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); + +input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction[String] { + def createIndexRequest(element: String): IndexRequest = { + val json = new util.HashMap[String, AnyRef] + json.put("data", element) + Requests.indexRequest.index("my-index").`type`("my-type").source(json) + } + + override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { + indexer.add(createIndexRequest(element)) + } +})) +{% endhighlight %} +</div> +</div> + +The first Map of Strings is used to configure the Transport Client. The configuration keys +are documented in the Elasticsearch documentation +[here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/index.html). +Especially important is the `cluster.name`. parameter that must correspond to +the name of your cluster. + +The second Map of Strings is used to configure a `BulkProcessor` to send Action requests to the cluster. +This will buffer elements and Action Requests before sending to the cluster. The behaviour of the +`BulkProcessor` can be configured using these config keys: + * **bulk.flush.max.actions**: Maximum amount of elements to buffer + * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer + * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two + settings in milliseconds + +This now provides a list of Elasticsearch Nodes +to which the sink should connect via a `TransportClient`. + +More information about Elasticsearch can be found [here](https://elastic.co). + http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/filesystem_sink.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md index 0fa8bb1..bcaeb17 100644 --- a/docs/dev/connectors/filesystem_sink.md +++ b/docs/dev/connectors/filesystem_sink.md @@ -2,7 +2,7 @@ title: "HDFS Connector" nav-title: Rolling File Sink nav-parent_id: connectors -nav-pos: 6 +nav-pos: 7 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md index ec0725a..f5c3eec 100644 --- a/docs/dev/connectors/index.md +++ b/docs/dev/connectors/index.md @@ -31,7 +31,6 @@ Currently these systems are supported: (Please select the respective documentati * [Apache Kafka](https://kafka.apache.org/) (sink/source) * [Elasticsearch](https://elastic.co/) (sink) - * [Elasticsearch 2x](https://elastic.co/) (sink) * [Hadoop FileSystem](http://hadoop.apache.org) (sink) * [RabbitMQ](http://www.rabbitmq.com/) (sink/source) * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source) http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/nifi.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/nifi.md b/docs/dev/connectors/nifi.md index aa9eba2..8223867 100644 --- a/docs/dev/connectors/nifi.md +++ b/docs/dev/connectors/nifi.md @@ -2,7 +2,7 @@ title: "Apache NiFi Connector" nav-title: NiFi nav-parent_id: connectors -nav-pos: 8 +nav-pos: 9 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/rabbitmq.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md index c94c99d..b4da248 100644 --- a/docs/dev/connectors/rabbitmq.md +++ b/docs/dev/connectors/rabbitmq.md @@ -2,7 +2,7 @@ title: "RabbitMQ Connector" nav-title: RabbitMQ nav-parent_id: connectors -nav-pos: 7 +nav-pos: 8 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/twitter.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md index be15aaf..f1fbbd4 100644 --- a/docs/dev/connectors/twitter.md +++ b/docs/dev/connectors/twitter.md @@ -2,7 +2,7 @@ title: "Twitter Connector" nav-title: Twitter nav-parent_id: connectors -nav-pos: 9 +nav-pos: 10 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml new file mode 100644 index 0000000..8fc5c8b --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-elasticsearch5_2.10</artifactId> + <name>flink-connector-elasticsearch5</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <elasticsearch.version>5.0.0</elasticsearch.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.7</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.7</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java new file mode 100644 index 0000000..f7ca499 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +public class BulkProcessorIndexer implements RequestIndexer { + private final BulkProcessor bulkProcessor; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor) { + this.bulkProcessor = bulkProcessor; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + this.bulkProcessor.add(actionRequest); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java new file mode 100644 index 0000000..29c69c4 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements in bulk to an Elasticsearch cluster. + * <p> + * <p> + * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * <p> + * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. + * <p> + * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * <ul> + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * </ul> + * <p> + * <p> + * You also have to provide an {@link RequestIndexer}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link RequestIndexer} for an example. + * + * @param <T> Type of the elements emitted by this sink + */ +public class ElasticsearchSink<T> extends RichSinkFunction<T> { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + /** + * The user specified config map that we forward to Elasticsearch when we create the Client. + */ + private final Map<String, String> esConfig; + + /** + * The user specified config map that we use to configure BulkProcessor. + */ + private final Map<String, String> sinkConfig; + + /** + * The list of nodes that the TransportClient should connect to. This is null if we are using + * an embedded Node to get a Client. + */ + private final List<InetSocketAddress> transportAddresses; + + /** + * The builder that is used to construct an {@link IndexRequest} from the incoming element. + */ + private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; + + /** + * The Client that was either retrieved from a Node or is a TransportClient. + */ + private transient Client client; + + /** + * Bulk processor that was created using the client + */ + private transient BulkProcessor bulkProcessor; + + /** + * Bulk {@link org.elasticsearch.action.ActionRequest} indexer + */ + private transient RequestIndexer requestIndexer; + + /** + * This is set from inside the BulkProcessor listener if there where failures in processing. + */ + private final AtomicBoolean hasFailure = new AtomicBoolean(false); + + /** + * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. + */ + private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); + + /** + * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * + * @param esConfig The map of user settings that are passed when constructing the TransportClient + * @param sinkConfig The map of user settings that are passed when constructing the BulkProcessor + * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} + * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element + */ + public ElasticsearchSink(Map<String, String> esConfig, Map<String, String> sinkConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + this.esConfig = esConfig; + this.sinkConfig = sinkConfig; + this.elasticsearchSinkFunction = elasticsearchSinkFunction; + Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0); + this.transportAddresses = transportAddresses; + } + + /** + * Initializes the connection to Elasticsearch by creating a + * {@link org.elasticsearch.client.transport.TransportClient}. + */ + @Override + public void open(Configuration configuration) { + List<TransportAddress> transportNodes; + transportNodes = new ArrayList<>(transportAddresses.size()); + for (InetSocketAddress address : transportAddresses) { + transportNodes.add(new InetSocketTransportAddress(address)); + } + + Settings settings = Settings.builder().put(esConfig) + .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) + .build(); + + TransportClient transportClient = new PreBuiltTransportClient(settings); + for (TransportAddress transport : transportNodes) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + if (transportClient.connectedNodes().isEmpty()) { + throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); + } + + client = transportClient; + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient {}", client); + } + + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + for (BulkItemResponse itemResp : response.getItems()) { + if (itemResp.isFailed()) { + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } + } + hasFailure.set(true); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error(failure.getMessage()); + failureThrowable.compareAndSet(null, failure); + hasFailure.set(true); + } + }); + + // This makes flush() blocking + bulkProcessorBuilder.setConcurrentRequests(0); + + ParameterTool params = ParameterTool.fromMap(sinkConfig); + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { + bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { + bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( + CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { + bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); + } + + bulkProcessor = bulkProcessorBuilder.build(); + requestIndexer = new BulkProcessorIndexer(bulkProcessor); + } + + @Override + public void invoke(T element) { + elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer); + } + + @Override + public void close() { + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + } + + if (hasFailure.get()) { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occurred in ElasticsearchSink.", cause); + } else { + throw new RuntimeException("An error occurred in ElasticsearchSink."); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java new file mode 100644 index 0000000..752a83e --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.Serializable; + +/** + * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream. + * + * <p> + * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch. + * + * <p> + * Example: + * + * <pre>{@code + * private static class TestElasticSearchSinkFunction implements + * ElasticsearchSinkFunction<Tuple2<Integer, String>> { + * + * public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { + * Map<String, Object> json = new HashMap<>(); + * json.put("data", element.f1); + * + * return Requests.indexRequest() + * .index("my-index") + * .type("my-type") + * .id(element.f0.toString()) + * .source(json); + * } + * + * public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { + * indexer.add(createIndexRequest(element)); + * } + * } + * + * }</pre> + * + * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction} + */ +public interface ElasticsearchSinkFunction<T> extends Serializable, Function { + void process(T element, RuntimeContext ctx, RequestIndexer indexer); +} http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java new file mode 100644 index 0000000..170df31 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +public interface RequestIndexer extends Serializable { + void add(ActionRequest... actionRequests); +} http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java new file mode 100644 index 0000000..b4a370b --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import com.google.common.collect.ImmutableMap; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty3Plugin; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { + + private static final int NUM_ELEMENTS = 20; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testTransportClient() throws Exception { + + File dataDir = tempFolder.newFolder(); + + Settings settings = Settings.builder() + .put("cluster.name", "my-transport-client-cluster") + .put("http.enabled", false) + .put("path.home", dataDir.getParent()) + .put("path.data", dataDir.getAbsolutePath()) + .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) + .build(); + + Node node = new PluginNode(settings); + node.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); + Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch TransportClient Test"); + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.prepareGet("my-index", "my-type", Integer.toString(i)).get(); + assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullTransportClient() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); + Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, null, new TestElasticsearchSinkFunction())); + + fail(); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyTransportClient() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); + Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch TransportClient Test"); + + fail(); + } + + @Test(expected = JobExecutionException.class) + public void testTransportClientFails() throws Exception { + // this checks whether the TransportClient fails early when there is no cluster to + // connect to. There isn't a similar test for the Node Client version since that + // one will block and wait for a cluster to come online + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); + Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch Node Client Test"); + + fail(); + } + + private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(Tuple2.of(i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element.f1); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element.f0.toString()) + .source(json); + } + + @Override + public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + } + + private static class PluginNode extends Node { + public PluginNode(Settings settings) { + super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java new file mode 100644 index 0000000..47ce846 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5.examples; + +import com.google.common.collect.ImmutableMap; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch5.RequestIndexer; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator<String> source = + env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> esConfig = ImmutableMap.of("cluster.name", "elasticsearch"); + + // This instructs the sink to emit after every element, otherwise they would be buffered + Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() { + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + env.execute("Elasticsearch Example"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties new file mode 100644 index 0000000..dc20726 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8893f7c --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming.connectors.elasticsearch5" level="WARN"/> +</configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 91ee6af..e19c77f 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -47,6 +47,7 @@ under the License. <module>flink-connector-kafka-0.10</module> <module>flink-connector-elasticsearch</module> <module>flink-connector-elasticsearch2</module> + <module>flink-connector-elasticsearch5</module> <module>flink-connector-rabbitmq</module> <module>flink-connector-twitter</module> <module>flink-connector-nifi</module> @@ -86,5 +87,5 @@ under the License. </modules> </profile> </profiles> - + </project>
