[
https://issues.apache.org/jira/browse/FLINK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204203#comment-15204203
]
ASF GitHub Bot commented on FLINK-3115:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1792#discussion_r56823244
--- Diff:
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ *
+ * <p>
+ * When using the first constructor {@link
#ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)}
+ * the sink will create a local {@link Node} for communicating with the
+ * Elasticsearch cluster. When using the second constructor
+ * {@link #ElasticsearchSink(java.util.Map, java.util.List,
ElasticsearchSinkFunction)} a {@link TransportClient} will
+ * be used instead.
+ *
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will
fail if no cluster
+ * can be connected to. With the {@code Node Client} the sink will block
and wait for a cluster
+ * to come online.
+ *
+ * <p>
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch
when creating
+ * the {@link Node} or {@link TransportClient}. The config keys can be
found in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this
should be set to the name
+ * of the cluster that the sink should emit to.
+ *
+ * <p>
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link
IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to
buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in
megabytes) to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data
regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ *
+ * <p>
+ * You also have to provide an {@link RequestIndexer}. This is used to
create an
+ * {@link IndexRequest} from an element that needs to be added to
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
"bulk.flush.max.actions";
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
"bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
"bulk.flush.interval.ms";
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+ /**
+ * The user specified config map that we forward to Elasticsearch when
we create the Client.
+ */
+ private final Map<String, String> userConfig;
+
+ /**
+ * The list of nodes that the TransportClient should connect to. This
is null if we are using
+ * an embedded Node to get a Client.
+ */
+ private final List<InetSocketAddress> transportAddresses;
+
+ /**
+ * The builder that is used to construct an {@link IndexRequest} from
the incoming element.
+ */
+ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+ /**
+ * The embedded Node that is used to communicate with the Elasticsearch
cluster. This is null
+ * if we are using a TransportClient.
+ */
+ private transient Node node;
+
+ /**
+ * The Client that was either retrieved from a Node or is a
TransportClient.
+ */
+ private transient Client client;
+
+ /**
+ * Bulk processor that was created using the client
+ */
+ private transient BulkProcessor bulkProcessor;
+
+ /**
+ * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
+ */
+ private transient RequestIndexer requestIndexer;
+
+ /**
+ * This is set from inside the BulkProcessor listener if there where
failures in processing.
+ */
+ private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+ /**
+ * This is set from inside the BulkProcessor listener if a Throwable
was thrown during processing.
+ */
+ private final AtomicReference<Throwable> failureThrowable = new
AtomicReference<>();
+
+ /**
+ * Creates a new ElasticsearchSink that connects to the cluster using
an embedded Node.
+ *
+ * @param userConfig The map of user settings that are passed when
constructing the Node and BulkProcessor
+ * @param elasticsearchSinkFunction This is used to generate the
IndexRequest from the incoming element
+ */
+ public ElasticsearchSink(Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ this.userConfig = userConfig;
+ this.elasticsearchSinkFunction = elasticsearchSinkFunction;
+ transportAddresses = null;
+ }
+
+ /**
+ * Creates a new ElasticsearchSink that connects to the cluster using a
TransportClient.
+ *
+ * @param userConfig The map of user settings that are passed when
constructing the TransportClient and BulkProcessor
+ * @param transportAddresses The Elasticsearch Nodes to which to
connect using a {@code TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate the
ActionRequest from the incoming element
+ *
+ */
+ public ElasticsearchSink(Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) {
+ this.userConfig = userConfig;
+ this.elasticsearchSinkFunction = elasticsearchSinkFunction;
+ this.transportAddresses = transportAddresses;
--- End diff --
I would add a check that the transportAddresses are not null and their size
> 0.
> Update Elasticsearch connector to 2.X
> -------------------------------------
>
> Key: FLINK-3115
> URL: https://issues.apache.org/jira/browse/FLINK-3115
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Affects Versions: 0.10.0, 1.0.0, 0.10.1
> Reporter: Maximilian Michels
> Assignee: Suneel Marthi
> Fix For: 1.0.1
>
>
> The Elasticsearch connector is not up to date anymore. In version 2.X the API
> changed. The code needs to be adapted. Probably it makes sense to have a new
> class {{ElasticsearchSink2}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)