[
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15818625#comment-15818625
]
ASF GitHub Bot commented on FLINK-4988:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2767#discussion_r95601743
--- Diff:
flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * <p>
+ * <p>
+ * The first {@link Map} passed to the constructor is forwarded to
Elasticsearch when creating
+ * {@link TransportClient}. The config keys can be found in the
Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this
should be set to the name
+ * of the cluster that the sink should emit to.
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will
fail if no cluster
+ * can be connected to.
+ * <p>
+ * The second {@link Map} is used to configure a {@link BulkProcessor} to
send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to
buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in
megabytes) to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data
regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ * <p>
+ * <p>
+ * You also have to provide an {@link RequestIndexer}. This is used to
create an
+ * {@link IndexRequest} from an element that needs to be added to
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
"bulk.flush.max.actions";
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
"bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
"bulk.flush.interval.ms";
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+ /**
+ * The user specified config map that we forward to Elasticsearch when
we create the Client.
+ */
+ private final Map<String, String> esConfig;
+
+ /**
+ * The user specified config map that we use to configure BulkProcessor.
+ */
+ private final Map<String, String> sinkConfig;
+
+ /**
+ * The list of nodes that the TransportClient should connect to. This
is null if we are using
+ * an embedded Node to get a Client.
+ */
+ private final List<InetSocketAddress> transportAddresses;
+
+ /**
+ * The builder that is used to construct an {@link IndexRequest} from
the incoming element.
+ */
+ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+ /**
+ * The Client that was either retrieved from a Node or is a
TransportClient.
+ */
+ private transient Client client;
+
+ /**
+ * Bulk processor that was created using the client
+ */
+ private transient BulkProcessor bulkProcessor;
+
+ /**
+ * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
+ */
+ private transient RequestIndexer requestIndexer;
+
+ /**
+ * This is set from inside the BulkProcessor listener if there where
failures in processing.
+ */
+ private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+ /**
+ * This is set from inside the BulkProcessor listener if a Throwable
was thrown during processing.
+ */
+ private final AtomicReference<Throwable> failureThrowable = new
AtomicReference<>();
+
+ /**
+ * Creates a new ElasticsearchSink that connects to the cluster using a
TransportClient.
+ *
+ * @param esConfig The map of user settings that are
passed when constructing the TransportClient
+ * @param sinkConfig The map of user settings that are
passed when constructing the BulkProcessor
+ * @param transportAddresses The Elasticsearch Nodes to which to
connect using a {@code TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate the
ActionRequest from the incoming element
+ */
+ public ElasticsearchSink(Map<String, String> esConfig, Map<String,
String> sinkConfig, List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
--- End diff --
I see, thanks for the explanation! I think we can resolve this be keeping a
single Map for user configuration at the API level, and internally, we separate
out the bulk processor settings.
> Elasticsearch 5.x support
> -------------------------
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
> Issue Type: New Feature
> Reporter: Mike Dias
>
> Elasticsearch 5.x was released:
> https://www.elastic.co/blog/elasticsearch-5-0-0-released
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)