http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java new file mode 100644 index 0000000..098afa9 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.node.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +/** + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x. + */ +public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge { + + private static final long serialVersionUID = -2632363720584123682L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch1ApiCallBridge.class); + + /** User-provided transport addresses. This is null if we are using an embedded {@link Node} for communication. */ + private final List<TransportAddress> transportAddresses; + + /** The embedded {@link Node} used for communication. This is null if we are using a TransportClient. */ + private transient Node node; + + /** + * Constructor for use of an embedded {@link Node} for communication with the Elasticsearch cluster. + */ + Elasticsearch1ApiCallBridge() { + this.transportAddresses = null; + } + + /** + * Constructor for use of a {@link TransportClient} for communication with the Elasticsearch cluster. + */ + Elasticsearch1ApiCallBridge(List<TransportAddress> transportAddresses) { + Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty()); + this.transportAddresses = transportAddresses; + } + + @Override + public Client createClient(Map<String, String> clientConfig) { + if (transportAddresses == null) { + + // Make sure that we disable http access to our embedded node + Settings settings = settingsBuilder() + .put(clientConfig) + .put("http.enabled", false) + .build(); + + node = nodeBuilder() + .settings(settings) + .client(true) + .data(false) + .node(); + + Client client = node.client(); + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch client from embedded node"); + } + + return client; + } else { + Settings settings = settingsBuilder() + .put(clientConfig) + .build(); + + TransportClient transportClient = new TransportClient(settings); + for (TransportAddress transport: transportAddresses) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + if (transportClient.connectedNodes().isEmpty()) { + throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!"); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes()); + } + + return transportClient; + } + } + + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (!bulkItemResponse.isFailed()) { + return null; + } else { + return new RuntimeException(bulkItemResponse.getFailureMessage()); + } + } + + @Override + public void cleanup() { + if (node != null && !node.isClosed()) { + node.close(); + node = null; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java index ac14ade..c338860 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java @@ -17,59 +17,38 @@ package org.apache.flink.streaming.connectors.elasticsearch; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.ImmutableList; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - /** - * Sink that emits its input elements to an Elasticsearch cluster. + * Elasticsearch 1.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. * * <p> - * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} - * the sink will create a local {@link Node} for communicating with the - * Elasticsearch cluster. When using the second constructor - * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will - * be used instead. + * When using the first constructor {@link #ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)} + * the sink will create a local {@link Node} for communicating with the Elasticsearch cluster. When using the second + * constructor {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a + * {@link TransportClient} will be used instead. * * <p> * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster - * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster + * can be connected to. When using the local {@code Node} for communicating, the sink will block and wait for a cluster * to come online. * * <p> - * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating - * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch - * documentation. An important setting is {@code cluster.name}, this should be set to the name - * of the cluster that the sink should emit to. + * The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config + * keys can be found in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is + * {@code cluster.name}, which should be set to the name of the cluster that the sink should emit to. * * <p> - * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. * This will buffer elements before sending a request to the cluster. The behaviour of the * {@code BulkProcessor} can be configured using these config keys: * <ul> @@ -80,236 +59,63 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder; * </ul> * * <p> - * You also have to provide an {@link IndexRequestBuilder}. This is used to create an - * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See - * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example. + * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of + * {@link ElasticsearchSinkFunction} for an example. * - * @param <T> Type of the elements emitted by this sink + * @param <T> Type of the elements handled by this sink */ -public class ElasticsearchSink<T> extends RichSinkFunction<T> { - - public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; - public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; - public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); - - /** - * The user specified config map that we forward to Elasticsearch when we create the Client. - */ - private final Map<String, String> userConfig; - - /** - * The list of nodes that the TransportClient should connect to. This is null if we are using - * an embedded Node to get a Client. - */ - private final List<TransportAddress> transportNodes; - - /** - * The builder that is used to construct an {@link IndexRequest} from the incoming element. - */ - private final IndexRequestBuilder<T> indexRequestBuilder; - - /** - * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null - * if we are using a TransportClient. - */ - private transient Node node; - /** - * The Client that was either retrieved from a Node or is a TransportClient. - */ - private transient Client client; - - /** - * Bulk processor that was created using the client - */ - private transient BulkProcessor bulkProcessor; - - /** - * This is set from inside the BulkProcessor listener if there where failures in processing. - */ - private final AtomicBoolean hasFailure = new AtomicBoolean(false); - - /** - * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. - */ - private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); - - /** - * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node. + * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}. * - * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor + * @param userConfig The map of user settings that are used when constructing the {@link Node} and {@link BulkProcessor} * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element + * + * @deprecated Deprecated since version 1.2, to be removed at version 2.0. + * Please use {@link ElasticsearchSink#ElasticsearchSink(Map, ElasticsearchSinkFunction)} instead. */ + @Deprecated public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) { - this.userConfig = userConfig; - this.indexRequestBuilder = indexRequestBuilder; - transportNodes = null; + this(userConfig, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder)); } /** - * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. * - * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor - * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient} - * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param indexRequestBuilder This is used to generate a {@link IndexRequest} from the incoming element * + * @deprecated Deprecated since 1.2, to be removed at 2.0. + * Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, ElasticsearchSinkFunction)} instead. */ - public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) { - this.userConfig = userConfig; - this.indexRequestBuilder = indexRequestBuilder; - this.transportNodes = transportNodes; + @Deprecated + public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, IndexRequestBuilder<T> indexRequestBuilder) { + this(userConfig, transportAddresses, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder)); } /** - * Initializes the connection to Elasticsearch by either creating an embedded - * {@link org.elasticsearch.node.Node} and retrieving the - * {@link org.elasticsearch.client.Client} from it or by creating a - * {@link org.elasticsearch.client.transport.TransportClient}. + * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}. + * + * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ - @Override - public void open(Configuration configuration) { - if (transportNodes == null) { - // Make sure that we disable http access to our embedded node - Settings settings = - ImmutableSettings.settingsBuilder() - .put(userConfig) - .put("http.enabled", false) - .build(); - - node = - nodeBuilder() - .settings(settings) - .client(true) - .data(false) - .node(); - - client = node.client(); - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch Client {} from embedded Node", client); - } - - } else { - Settings settings = ImmutableSettings.settingsBuilder() - .put(userConfig) - .build(); - - TransportClient transportClient = new TransportClient(settings); - for (TransportAddress transport: transportNodes) { - transportClient.addTransportAddress(transport); - } - - // verify that we actually are connected to a cluster - ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes(); - if (nodes.isEmpty()) { - throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Connected to nodes: " + nodes.toString()); - } - } - - client = transportClient; - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch TransportClient {}", client); - } - } - - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( - client, - new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, - BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, - BulkRequest request, - BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); - } - } - hasFailure.set(true); - } - } - - @Override - public void afterBulk(long executionId, - BulkRequest request, - Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); - } - }); - - // This makes flush() blocking - bulkProcessorBuilder.setConcurrentRequests(0); - - ParameterTool params = ParameterTool.fromMap(userConfig); - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { - bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { - bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( - CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); - } - - bulkProcessor = bulkProcessorBuilder.build(); - } - - @Override - public void invoke(T element) { - IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Emitting IndexRequest: {}", indexRequest); - } - - bulkProcessor.add(indexRequest); + public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction); } - @Override - public void close() { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } - - if (client != null) { - client.close(); - } - - if (node != null) { - node.close(); - } - - if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occured in ElasticsearchSink."); - - } - } + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + */ + public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java index 04ae40a..18aa11e 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -51,7 +51,11 @@ import java.io.Serializable; * }</pre> * * @param <T> The type of the element handled by this {@code IndexRequestBuilder} + * + * @deprecated Deprecated since version 1.2, to be removed at version 2.0. + * Please create a {@link ElasticsearchSink} using a {@link ElasticsearchSinkFunction} instead. */ +@Deprecated public interface IndexRequestBuilder<T> extends Function, Serializable { /** http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java new file mode 100644 index 0000000..6f1d138 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * A dummy {@link ElasticsearchSinkFunction} that wraps a {@link IndexRequestBuilder}. + * This serves as a bridge for the usage deprecation of the {@code IndexRequestBuilder} interface. + */ +class IndexRequestBuilderWrapperFunction<T> implements ElasticsearchSinkFunction<T> { + + private static final long serialVersionUID = 289876038414250101L; + + private final IndexRequestBuilder<T> indexRequestBuilder; + + IndexRequestBuilderWrapperFunction(IndexRequestBuilder<T> indexRequestBuilder) { + this.indexRequestBuilder = indexRequestBuilder; + } + + @Override + public void process(T element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(indexRequestBuilder.createIndexRequest(element, ctx)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java index 33a2e47..3a7b113 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,177 +18,149 @@ package org.apache.flink.streaming.connectors.elasticsearch; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; +import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit; +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.node.Node; -import org.junit.Assert; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { + @Test + public void testTransportClient() throws Exception { + runTransportClientTest(); + } - private static final int NUM_ELEMENTS = 20; + @Test + public void testNullTransportClient() throws Exception { + runNullTransportClientTest(); + } - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); + @Test + public void testEmptyTransportClient() throws Exception { + runEmptyTransportClientTest(); + } @Test - public void testNodeClient() throws Exception{ + public void testTransportClientFails() throws Exception{ + runTransportClientFailsTest(); + } - File dataDir = tempFolder.newFolder(); + // -- Tests specific to Elasticsearch 1.x -- - Node node = nodeBuilder() - .settings(ImmutableSettings.settingsBuilder() - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-node-client-cluster") - .local(true) - .node(); + /** + * Tests that the Elasticsearch sink works properly using an embedded node to connect to Elasticsearch. + */ + @Test + public void testEmbeddedNode() throws Exception { + final String index = "embedded-node-test-index"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()); - Map<String, String> config = Maps.newHashMap(); + Map<String, String> userConfig = new HashMap<>(); // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - // connect to our local node - config.put("node.local", "true"); + userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + userConfig.put("cluster.name", CLUSTER_NAME); + userConfig.put("node.local", "true"); - source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder())); - - env.execute("Elasticsearch Node Client Test"); + source.addSink(new ElasticsearchSink<>( + userConfig, + new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)) + ); + env.execute("Elasticsearch Embedded Node Test"); // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", - Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } + Client client = embeddedNodeEnv.getClient(); + SourceSinkDataTestKit.verifyProducedSinkData(client, index); - node.close(); + client.close(); } + /** + * Tests that behaviour of the deprecated {@link IndexRequestBuilder} constructor works properly. + */ @Test - public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); + public void testDeprecatedIndexRequestBuilderVariant() throws Exception { + final String index = "index-req-builder-test-index"; - Node node = nodeBuilder() - .settings(ImmutableSettings.settingsBuilder() - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-node-client-cluster") - .local(true) - .node(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()); - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = Maps.newHashMap(); + Map<String, String> userConfig = new HashMap<>(); // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - // connect to our local node - config.put("node.local", "true"); + userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + userConfig.put("cluster.name", CLUSTER_NAME); + userConfig.put("node.local", "true"); List<TransportAddress> transports = Lists.newArrayList(); transports.add(new LocalTransportAddress("1")); - source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); - - env.execute("Elasticsearch TransportClient Test"); + source.addSink(new ElasticsearchSink<>( + userConfig, + transports, + new TestIndexRequestBuilder(index)) + ); + env.execute("Elasticsearch Deprecated IndexRequestBuilder Bridge Test"); // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", - Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } + Client client = embeddedNodeEnv.getClient(); + SourceSinkDataTestKit.verifyProducedSinkData(client, index); - node.close(); + client.close(); } - @Test(expected = JobExecutionException.class) - public void testTransportClientFails() throws Exception{ - // this checks whether the TransportClient fails early when there is no cluster to - // connect to. We don't hava such as test for the Node Client version since that - // one will block and wait for a cluster to come online - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction); + } - Map<String, String> config = Maps.newHashMap(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( + Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { - // connect to our local node - config.put("node.local", "true"); + // Elasticsearch 1.x requires this setting when using + // LocalTransportAddress to connect to a local embedded node + userConfig.put("node.local", "true"); List<TransportAddress> transports = Lists.newArrayList(); transports.add(new LocalTransportAddress("1")); - source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); - - env.execute("Elasticsearch Node Client Test"); + return new ElasticsearchSink<>( + userConfig, + transports, + elasticsearchSinkFunction); } - private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { + /** + * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}. + */ + private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> { private static final long serialVersionUID = 1L; - private volatile boolean running = true; + private final String index; - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < NUM_ELEMENTS && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; + public TestIndexRequestBuilder(String index) { + this.index = index; } - } - - private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; @Override public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) { @@ -196,10 +168,10 @@ public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { json.put("data", element.f1); return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element.f0.toString()) - .source(json); + .index(index) + .type("flink-es-test-type") + .id(element.f0.toString()) + .source(json); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java new file mode 100644 index 0000000..a0c809b --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.node.Node; + +import java.io.File; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 1.x. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + node = nodeBuilder() + .settings(ImmutableSettings.settingsBuilder() + .put("http.enabled", false) + .put("path.data", tmpDataFolder.getAbsolutePath())) + .clusterName(clusterName) + .local(true) + .node(); + + node.start(); + } + } + + @Override + public void close() throws Exception { + if (node != null && !node.isClosed()) { + node.close(); + node = null; + } + } + + @Override + public Client getClient() { + if (node != null && !node.isClosed()) { + return node.client(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java deleted file mode 100644 index 136ae77..0000000 --- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch.examples; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the cluster name in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - for (int i = 0; i < 20 && running; i++) { - ctx.collect("message #" + i); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { - @Override - public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - })); - - - env.execute("Elasticsearch Example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java new file mode 100644 index 0000000..d697c3c --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the cluster name in the config map. + */ +public class ElasticsearchSinkExample { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<TransportAddress> transports = new ArrayList<>(); + transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + + env.execute("Elasticsearch Sink Example"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties index dc20726..2055184 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties +++ b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties @@ -16,12 +16,12 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.target=System.err log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml index 7aba36e..30396dd 100644 --- a/flink-connectors/flink-connector-elasticsearch2/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml @@ -52,17 +52,19 @@ under the License. </dependency> <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_2.10</artifactId> + <version>${project.version}</version> </dependency> + <!-- Override Elasticsearch version in base from 1.x to 2.x --> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> </dependency> - <!-- core dependencies --> + <!-- test dependencies --> <dependency> <groupId>org.apache.flink</groupId> @@ -78,6 +80,15 @@ under the License. <scope>test</scope> <type>test-jar</type> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java deleted file mode 100644 index 650931f..0000000 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkProcessor; - -public class BulkProcessorIndexer implements RequestIndexer { - private final BulkProcessor bulkProcessor; - - public BulkProcessorIndexer(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; - } - - @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { - this.bulkProcessor.add(actionRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java new file mode 100644 index 0000000..9407d9f --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x. + */ +public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge { + + private static final long serialVersionUID = 2638252694744361079L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch2ApiCallBridge.class); + + /** + * User-provided transport addresses. + * + * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 2.x. + */ + private final List<InetSocketAddress> transportAddresses; + + Elasticsearch2ApiCallBridge(List<InetSocketAddress> transportAddresses) { + Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty()); + this.transportAddresses = transportAddresses; + } + + @Override + public Client createClient(Map<String, String> clientConfig) { + Settings settings = Settings.settingsBuilder().put(clientConfig).build(); + + TransportClient transportClient = TransportClient.builder().settings(settings).build(); + for (TransportAddress address : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) { + transportClient.addTransportAddress(address); + } + + // verify that we actually are connected to a cluster + if (transportClient.connectedNodes().isEmpty()) { + throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!"); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes()); + } + + return transportClient; + } + + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (!bulkItemResponse.isFailed()) { + return null; + } else { + return bulkItemResponse.getFailure().getCause(); + } + } + + @Override + public void cleanup() { + // nothing to cleanup + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java index e839589..a0abc51 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java @@ -16,55 +16,30 @@ */ package org.apache.flink.streaming.connectors.elasticsearch2; -import com.google.common.collect.ImmutableList; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.Preconditions; -import org.elasticsearch.action.bulk.BulkItemResponse; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** - * Sink that emits its input elements in bulk to an Elasticsearch cluster. + * Elasticsearch 2.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. * * <p> - * When using the second constructor - * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will - * be used. + * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. * * <p> - * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster - * can be connected to. + * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found + * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name}, + * which should be set to the name of the cluster that the sink should emit to. * * <p> - * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating - * {@link TransportClient}. The config keys can be found in the Elasticsearch - * documentation. An important setting is {@code cluster.name}, this should be set to the name - * of the cluster that the sink should emit to. - * - * <p> - * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. * This will buffer elements before sending a request to the cluster. The behaviour of the * {@code BulkProcessor} can be configured using these config keys: * <ul> @@ -75,183 +50,41 @@ import java.util.concurrent.atomic.AtomicReference; * </ul> * * <p> - * You also have to provide an {@link RequestIndexer}. This is used to create an - * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See - * {@link RequestIndexer} for an example. + * You also have to provide an {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction}. + * This is used to create multiple {@link ActionRequest ActionRequests} for each incoming element. See the class level + * documentation of {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} for an example. * - * @param <T> Type of the elements emitted by this sink + * @param <T> Type of the elements handled by this sink */ -public class ElasticsearchSink<T> extends RichSinkFunction<T> { - - public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; - public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; - public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); - - /** - * The user specified config map that we forward to Elasticsearch when we create the Client. - */ - private final Map<String, String> userConfig; - - /** - * The list of nodes that the TransportClient should connect to. This is null if we are using - * an embedded Node to get a Client. - */ - private final List<InetSocketAddress> transportAddresses; - - /** - * The builder that is used to construct an {@link IndexRequest} from the incoming element. - */ - private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; - - /** - * The Client that was either retrieved from a Node or is a TransportClient. - */ - private transient Client client; - - /** - * Bulk processor that was created using the client - */ - private transient BulkProcessor bulkProcessor; - - /** - * Bulk {@link org.elasticsearch.action.ActionRequest} indexer - */ - private transient RequestIndexer requestIndexer; - - /** - * This is set from inside the BulkProcessor listener if there where failures in processing. - */ - private final AtomicBoolean hasFailure = new AtomicBoolean(false); - - /** - * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. - */ - private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); - /** - * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. * - * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor - * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} - * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element * + * @deprecated Deprecated since 1.2, to be removed at 2.0. + * Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction)} instead. */ + @Deprecated public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this.userConfig = userConfig; - this.elasticsearchSinkFunction = elasticsearchSinkFunction; - Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0); - this.transportAddresses = transportAddresses; + this(userConfig, transportAddresses, new OldNewElasticsearchSinkFunctionBridge<>(elasticsearchSinkFunction)); } /** - * Initializes the connection to Elasticsearch by creating a - * {@link org.elasticsearch.client.transport.TransportClient}. + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are passed when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ - @Override - public void open(Configuration configuration) { - List<TransportAddress> transportNodes; - transportNodes = new ArrayList<>(transportAddresses.size()); - for (InetSocketAddress address : transportAddresses) { - transportNodes.add(new InetSocketTransportAddress(address)); - } - - Settings settings = Settings.settingsBuilder().put(userConfig).build(); - - TransportClient transportClient = TransportClient.builder().settings(settings).build(); - for (TransportAddress transport: transportNodes) { - transportClient.addTransportAddress(transport); - } - - // verify that we actually are connected to a cluster - ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes()); - if (nodes.isEmpty()) { - throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); - } - - client = transportClient; - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch TransportClient {}", client); - } - - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); - } - } - hasFailure.set(true); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); - } - }); - - // This makes flush() blocking - bulkProcessorBuilder.setConcurrentRequests(0); - - ParameterTool params = ParameterTool.fromMap(userConfig); - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { - bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { - bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( - CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); - } - - bulkProcessor = bulkProcessorBuilder.build(); - requestIndexer = new BulkProcessorIndexer(bulkProcessor); - } - - @Override - public void invoke(T element) { - elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer); + public ElasticsearchSink(Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction); } - - @Override - public void close() { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } - - if (client != null) { - client.close(); - } - - if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occured in ElasticsearchSink."); - } - } - - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java index 55ba720..c474390 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,7 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; /** - * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream. + * Method that creates multiple {@link org.elasticsearch.action.ActionRequest}s from an element in a Stream. * * <p> * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch. @@ -54,7 +53,12 @@ import java.io.Serializable; * }</pre> * * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction} + * + * @deprecated Deprecated since 1.2, to be removed at 2.0. + * This class has been deprecated due to package relocation. + * Please use {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} instead. */ +@Deprecated public interface ElasticsearchSinkFunction<T> extends Serializable, Function { void process(T element, RuntimeContext ctx, RequestIndexer indexer); } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java new file mode 100644 index 0000000..c95fff5 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +/** + * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} to bridge + * the migration from the deprecated {@link ElasticsearchSinkFunction}. + */ +class OldNewElasticsearchSinkFunctionBridge<T> implements org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> { + + private static final long serialVersionUID = 2415651895272659448L; + + private final ElasticsearchSinkFunction<T> deprecated; + private OldNewRequestIndexerBridge reusedRequestIndexerBridge; + + OldNewElasticsearchSinkFunctionBridge(ElasticsearchSinkFunction<T> deprecated) { + this.deprecated = deprecated; + } + + @Override + public void process(T element, RuntimeContext ctx, RequestIndexer indexer) { + if (reusedRequestIndexerBridge == null) { + reusedRequestIndexerBridge = new OldNewRequestIndexerBridge(indexer); + } + deprecated.process(element, ctx, reusedRequestIndexerBridge); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java new file mode 100644 index 0000000..f42fb44 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.elasticsearch.action.ActionRequest; + +/** + * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} to bridge + * the migration from the deprecated {@link RequestIndexer}. + */ +class OldNewRequestIndexerBridge implements RequestIndexer { + + private static final long serialVersionUID = 4213982619497149416L; + + private final org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer; + + OldNewRequestIndexerBridge(org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer) { + this.requestIndexer = requestIndexer; + } + + @Override + public void add(ActionRequest... actionRequests) { + requestIndexer.add(actionRequests); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java index 144a87b..b2b3de4 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java @@ -20,6 +20,12 @@ import org.elasticsearch.action.ActionRequest; import java.io.Serializable; +/** + * @deprecated Deprecated since 1.2, to be removed at 2.0. + * This class has been deprecated due to package relocation. + * Please use {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} instead. + */ +@Deprecated public interface RequestIndexer extends Serializable { void add(ActionRequest... actionRequests); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java new file mode 100644 index 0000000..ddf3bd6 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkITCase; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +import java.io.File; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 2.x. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + node = NodeBuilder.nodeBuilder().settings( + Settings.settingsBuilder() + .put("path.home", tmpDataFolder.getParent()) + .put("http.enabled", false) + .put("path.data", tmpDataFolder.getAbsolutePath())) + .clusterName(clusterName) + .node(); + + node.start(); + } + } + + @Override + public void close() throws Exception { + if (node != null && !node.isClosed()) { + node.close(); + node = null; + } + } + + @Override + public Client getClient() { + if (node != null && !node.isClosed()) { + return node.client(); + } else { + return null; + } + } + +}
