CAMEL-11868: New ElasticSearch5 REST component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/928f185f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/928f185f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/928f185f Branch: refs/heads/master Commit: 928f185f9c5fcb1eb48d8f626b7f569210bca5f7 Parents: 24ae294 Author: fharms <[email protected]> Authored: Mon Oct 16 19:26:43 2017 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Oct 16 20:00:24 2017 +0200 ---------------------------------------------------------------------- components/camel-elasticsearch5-rest/pom.xml | 120 ++++++++ .../elasticsearch5/ElasticsearchComponent.java | 233 +++++++++++++++ .../ElasticsearchConfiguration.java | 245 +++++++++++++++ .../elasticsearch5/ElasticsearchConstants.java | 37 +++ .../elasticsearch5/ElasticsearchEndpoint.java | 60 ++++ .../elasticsearch5/ElasticsearchOperation.java | 55 ++++ .../elasticsearch5/ElasticsearchProducer.java | 298 +++++++++++++++++++ .../BulkRequestAggregationStrategy.java | 52 ++++ .../ElasticsearchActionRequestConverter.java | 206 +++++++++++++ .../apache/camel/component/elasticsearch5-rest | 18 ++ .../elasticsearch5/ElasticsearchBaseTest.java | 124 ++++++++ .../elasticsearch5/ElasticsearchBulkTest.java | 113 +++++++ .../ElasticsearchClusterBaseTest.java | 120 ++++++++ .../ElasticsearchClusterIndexTest.java | 90 ++++++ ...icsearchGetSearchDeleteExistsUpdateTest.java | 288 ++++++++++++++++++ .../elasticsearch5/ElasticsearchIndexTest.java | 90 ++++++ .../src/test/resources/log4j2.properties | 7 + components/pom.xml | 1 + parent/pom.xml | 3 +- .../spring-boot/components-starter/pom.xml | 1 + 20 files changed, 2160 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/pom.xml b/components/camel-elasticsearch5-rest/pom.xml new file mode 100644 index 0000000..77c4516 --- /dev/null +++ b/components/camel-elasticsearch5-rest/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.20.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.camel</groupId> + <artifactId>camel-elasticsearch5-rest</artifactId> + <packaging>jar</packaging> + <name>Camel :: ElasticSearch5 :: REST</name> + <description>Camel ElasticSearch 5.x REST support</description> + + <properties> + <elasticsearch.version>${elasticsearch5-version}</elasticsearch.version> + <camel.osgi.export.pkg>org.apache.camel.component.elasticsearch5.*;${camel.osgi.version}</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=elasticsearch-rest</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>${elasticsearch5-version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-client-sniffer</artifactId> + <version>${elasticsearch-sniffer-version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson2-version}</version> + </dependency> + + <!-- for testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch5-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.codelibs</groupId> + <artifactId>elasticsearch-cluster-runner</artifactId> + <version>${elasticsearch5-cluster-runner-version}</version> + <scope>test</scope> + </dependency> + + <!-- logging --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <es.path.data>target/data</es.path.data> + </systemPropertyVariables> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java new file mode 100644 index 0000000..79eca0b --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.spi.Metadata; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; + +/** + * Represents the component that manages {@link ElasticsearchEndpoint}. + */ +public class ElasticsearchComponent extends DefaultComponent { + + @Metadata(label = "advanced") + private RestClient client; + @Metadata(label = "advanced") + private String hostAddresses; + @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT) + private int socketTimeout = ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT; + @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.MAX_RETRY_TIMEOUT) + private int maxRetryTimeout = ElasticsearchConstants.MAX_RETRY_TIMEOUT; + @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT) + private int connectionTimeout = ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT; + + @Metadata(label = "advance") + private String user; + @Metadata(secret = true) + private String password; + @Metadata(label = "advanced", defaultValue = "false") + private boolean enableSSL; + @Metadata(label = "advanced", defaultValue = "false") + private boolean enableSniffer; + @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL) + private int snifferInterval = ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL; + @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY) + private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY; + + public ElasticsearchComponent() { + super(); + } + + public ElasticsearchComponent(CamelContext context) { + super(context); + } + + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + ElasticsearchConfiguration config = new ElasticsearchConfiguration(); + config.setHostAddresses(this.getHostAddresses()); + config.setSocketTimeout(this.getSocketTimeout()); + config.setMaxRetryTimeout(this.getMaxRetryTimeout()); + config.setConnectionTimeout(this.getConnectionTimeout()); + config.setUser(this.getUser()); + config.setEnableSSL(this.getEnableSSL()); + config.setPassword(this.getPassword()); + config.setEnableSniffer(this.getEnableSniffer()); + config.setSnifferInterval(this.getSnifferInterval()); + config.setSniffAfterFailureDelay(this.getSniffAfterFailureDelay()); + config.setClusterName(remaining); + + setProperties(config, parameters); + config.setHostAddressesList(parseHostAddresses(config.getHostAddresses(), config)); + + Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config, client); + return endpoint; + } + + private List<HttpHost> parseHostAddresses(String ipsString, ElasticsearchConfiguration config) throws UnknownHostException { + if (ipsString == null || ipsString.isEmpty()) { + return null; + } + List<String> addressesStr = Arrays.asList(ipsString.split(",")); + List<HttpHost> addressesTrAd = new ArrayList<>(addressesStr.size()); + for (String address : addressesStr) { + String[] split = address.split(":"); + String hostname; + if (split.length > 0) { + hostname = split[0]; + } else { + throw new IllegalArgumentException(); + } + Integer port = split.length > 1 ? Integer.parseInt(split[1]) : ElasticsearchConstants.DEFAULT_PORT; + addressesTrAd.add(new HttpHost(hostname, port, config.getEnableSSL() ? "HTTPS" : "HTTP")); + } + return addressesTrAd; + } + + public RestClient getClient() { + return client; + } + + /** + * To use an existing configured Elasticsearch client, instead of creating a client per endpoint. + * This allow to customize the client with specific settings. + */ + public void setClient(RestClient client) { + this.client = client; + } + /** + * Comma separated list with ip:port formatted remote transport addresses to use. + * The ip and port options must be left blank for hostAddresses to be considered instead. + */ + public String getHostAddresses() { + return hostAddresses; + } + + public void setHostAddresses(String hostAddresses) { + this.hostAddresses = hostAddresses; + } + + /** + * The timeout in ms to wait before the socket will timeout. + */ + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + /** + * The time in ms to wait before connection will timeout. + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + /** + * Basic authenticate user + */ + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + /** + * Password for authenticate + */ + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + /** + * Enable SSL + */ + public Boolean getEnableSSL() { + return enableSSL; + } + + public void setEnableSSL(Boolean enableSSL) { + this.enableSSL = enableSSL; + } + + /** + * The time in ms before retry + */ + public int getMaxRetryTimeout() { + return maxRetryTimeout; + } + + public void setMaxRetryTimeout(int maxRetryTimeout) { + this.maxRetryTimeout = maxRetryTimeout; + } + + /** + * Enable automatically discover nodes from a running Elasticsearch cluster + */ + public Boolean getEnableSniffer() { + return enableSniffer; + } + + public void setEnableSniffer(Boolean enableSniffer) { + this.enableSniffer = enableSniffer; + } + + /** + * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when + * sniffOnFailure is disabled or when there are no failures between consecutive sniff executions + */ + public int getSnifferInterval() { + return snifferInterval; + } + + public void setSnifferInterval(int snifferInterval) { + this.snifferInterval = snifferInterval; + } + + /** + * The delay of a sniff execution scheduled after a failure (in milliseconds) + */ + public int getSniffAfterFailureDelay() { + return sniffAfterFailureDelay; + } + + public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) { + this.sniffAfterFailureDelay = sniffAfterFailureDelay; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java new file mode 100644 index 0000000..e1f109b --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.util.List; + +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +import org.apache.camel.spi.UriPath; +import org.apache.http.HttpHost; + +@UriParams +public class ElasticsearchConfiguration { + + private List<HttpHost> hostAddressesList; + + @UriPath @Metadata(required = "true") + private String clusterName; + @UriParam + private ElasticsearchOperation operation; + @UriParam + private String indexName; + @UriParam + private String indexType; + @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS) + private int waitForActiveShards = ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS; + @UriParam @Metadata(required = "true") + private String hostAddresses; + @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT) + private int socketTimeout = ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT; + @UriParam(defaultValue = "" + ElasticsearchConstants.MAX_RETRY_TIMEOUT) + private int maxRetryTimeout = ElasticsearchConstants.MAX_RETRY_TIMEOUT; + @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT) + private int connectionTimeout = ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT; + @UriParam(defaultValue = "false") + private boolean disconnect; + + private String user; + private String password; + private boolean enableSSL; + //Sniffer parameter. + private boolean enableSniffer; + private int snifferInterval = ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL; + private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY; + /** + * Name of the cluster + */ + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + /** + * What operation to perform + */ + public ElasticsearchOperation getOperation() { + return operation; + } + + public void setOperation(ElasticsearchOperation operation) { + this.operation = operation; + } + + /** + * The name of the index to act against + */ + public String getIndexName() { + return indexName; + } + + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + /** + * The type of the index to act against + */ + public String getIndexType() { + return indexType; + } + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + /** + * Comma separated list with ip:port formatted remote transport addresses to use. + * The ip and port options must be left blank for hostAddresses to be considered instead. + */ + public String getHostAddresses() { + return hostAddresses; + } + + public void setHostAddresses(String hostAddresses) { + this.hostAddresses = hostAddresses; + } + + /** + * Index creation waits for the write consistency number of shards to be available + */ + public int getWaitForActiveShards() { + return waitForActiveShards; + } + + public void setWaitForActiveShards(int waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + } + + public List<HttpHost> getHostAddressesList() { + return hostAddressesList; + } + + public void setHostAddressesList(List<HttpHost> hostAddressesList) { + this.hostAddressesList = hostAddressesList; + } + + /** + * The timeout in ms to wait before the socket will timeout. + */ + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + /** + * The time in ms to wait before connection will timeout. + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + /** + * Basic authenticate user + */ + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + /** + * Password for authenticate + */ + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + /** + * Enable SSL + */ + public Boolean getEnableSSL() { + return enableSSL; + } + + public void setEnableSSL(Boolean enableSSL) { + this.enableSSL = enableSSL; + } + + /** + * The time in ms before retry + */ + public int getMaxRetryTimeout() { + return maxRetryTimeout; + } + + public void setMaxRetryTimeout(int maxRetryTimeout) { + this.maxRetryTimeout = maxRetryTimeout; + } + + /** + * Disconnect after it finish calling the producer + */ + public Boolean getDisconnect() { + return disconnect; + } + + public void setDisconnect(Boolean disconnect) { + this.disconnect = disconnect; + } + + /** + * Enable automatically discover nodes from a running Elasticsearch cluster + */ + public Boolean getEnableSniffer() { + return enableSniffer; + } + + public void setEnableSniffer(Boolean enableSniffer) { + this.enableSniffer = enableSniffer; + } + + /** + * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when + * sniffOnFailure is disabled or when there are no failures between consecutive sniff executions + */ + public int getSnifferInterval() { + return snifferInterval; + } + + public void setSnifferInterval(int snifferInterval) { + this.snifferInterval = snifferInterval; + } + + /** + * The delay of a sniff execution scheduled after a failure (in milliseconds) + */ + public int getSniffAfterFailureDelay() { + return sniffAfterFailureDelay; + } + + public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) { + this.sniffAfterFailureDelay = sniffAfterFailureDelay; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java new file mode 100644 index 0000000..9f2d493 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.elasticsearch5; + + +public interface ElasticsearchConstants { + + String PARAM_OPERATION = "operation"; + String PARAM_INDEX_ID = "indexId"; + String PARAM_INDEX_NAME = "indexName"; + String PARAM_INDEX_TYPE = "indexType"; + String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards"; + + int DEFAULT_PORT = 9200; + int DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard + int DEFAULT_SOCKET_TIMEOUT = 30000; // Meaning how long time to wait before the socket timeout + int MAX_RETRY_TIMEOUT = 30000; // Meaning how long to wait before retry again + int DEFAULT_CONNECTION_TIMEOUT = 30000; // Meaning how many seconds before it timeout when establish connection + int DEFAULT_SNIFFER_INTERVAL = 60000 * 5; // Meaning how often it should search for elasticsearch nodes + int DEFAULT_AFTER_FAILURE_DELAY = 60000; // Meaning when should the sniff execution scheduled after a failure + +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java new file mode 100644 index 0000000..37bafd6 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.elasticsearch.client.RestClient; + +/** + * The elasticsearch component is used for interfacing with ElasticSearch server using 5.x REST API. + */ +@UriEndpoint(firstVersion = "2.21.0", scheme = "elasticsearch5-rest", title = "Elastichsearch Rest or Elasticsearch 5 Rest", + syntax = "elasticsearch5-rest:clusterName", producerOnly = true, label = "monitoring,search") +public class ElasticsearchEndpoint extends DefaultEndpoint { + + @UriParam + protected final ElasticsearchConfiguration configuration; + + private RestClient client; + + public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config, RestClient client) throws Exception { + super(uri, component); + this.configuration = config; + this.client = client; + } + + public Producer createProducer() throws Exception { + return new ElasticsearchProducer(this, configuration); + } + + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("Cannot consume from an ElasticsearchEndpoint: " + getEndpointUri()); + } + + public boolean isSingleton() { + return true; + } + + public RestClient getClient() { + return client; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java new file mode 100644 index 0000000..11b57f8 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +/** + * The ElasticSearch server operations list which are implemented + * + * Index - Index a document associated with a given index and type + * Update - Updates a document based on a script + * Bulk - Executes a bulk of index / delete operations + * BulkIndex - Executes a bulk of index / delete operations + * GetById - Gets the document that was indexed from an index with a type and id + * MultiGet - Multiple get documents + * Delete - Deletes a document from the index based on the index, type and id + * Search - Search across one or more indices and one or more types with a query + * Exists - Checks the index exists or not (using search with size=0 and terminate_after=1 parameters) + * + */ +public enum ElasticsearchOperation { + Index("Index"), + Update("Update"), + Bulk("Bulk"), + BulkIndex("BulkIndex"), + GetById("GetById"), + MultiGet("MultiGet"), + Delete("Delete"), + DeleteIndex("DeleteIndex"), + Search("Search"), + Exists("Exists"); + + private final String text; + + ElasticsearchOperation(final String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java new file mode 100644 index 0000000..f87fb54 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.lang.reflect.InvocationTargetException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.elasticsearch5.converter.ElasticsearchActionRequestConverter; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.IOHelper; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.sniff.Sniffer; +import org.elasticsearch.client.sniff.SnifferBuilder; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Represents an Elasticsearch producer. + */ +public class ElasticsearchProducer extends DefaultProducer { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class); + + protected final ElasticsearchConfiguration configuration; + private RestClient client; + private Sniffer sniffer; + + public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) { + super(endpoint); + this.configuration = configuration; + this.client = endpoint.getClient(); + } + + private ElasticsearchOperation resolveOperation(Exchange exchange) { + // 1. Operation can be driven by either (in order of preference): + // a. If the body is an ActionRequest the operation is set by the type + // of request. + // b. If the body is not an ActionRequest, the operation is set by the + // header if it exists. + // c. If neither the operation can not be derived from the body or + // header, the configuration is used. + // In the event we can't discover the operation from a, b or c we throw + // an error. + Object request = exchange.getIn().getBody(); + if (request instanceof IndexRequest) { + return ElasticsearchOperation.Index; + } else if (request instanceof GetRequest) { + return ElasticsearchOperation.GetById; + } else if (request instanceof MultiGetRequest) { + return ElasticsearchOperation.MultiGet; + } else if (request instanceof UpdateRequest) { + return ElasticsearchOperation.Update; + } else if (request instanceof BulkRequest) { + // do we want bulk or bulk_index? + if (configuration.getOperation() == ElasticsearchOperation.BulkIndex) { + return configuration.getOperation().BulkIndex; + } else { + return configuration.getOperation().Bulk; + } + } else if (request instanceof DeleteRequest) { + return ElasticsearchOperation.Delete; + } else if (request instanceof SearchRequest) { + return ElasticsearchOperation.Search; + } else if (request instanceof DeleteIndexRequest) { + return ElasticsearchOperation.DeleteIndex; + } + + ElasticsearchOperation operationConfig = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.class); + if (operationConfig == null) { + operationConfig = configuration.getOperation(); + } + if (operationConfig == null) { + throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported"); + } + return operationConfig; + } + + public void process(Exchange exchange) throws Exception { + if (configuration.getDisconnect() && client == null) { + startClient(); + } + RestHighLevelClient restHighLevelClient = new RestHighLevelClient(client); + // 2. Index and type will be set by: + // a. If the incoming body is already an action request + // b. If the body is not an action request we will use headers if they + // are set. + // c. If the body is not an action request and the headers aren't set we + // will use the configuration. + // No error is thrown by the component in the event none of the above + // conditions are met. The java es client + // will throw. + + Message message = exchange.getIn(); + final ElasticsearchOperation operation = resolveOperation(exchange); + + // Set the index/type headers on the exchange if necessary. This is used + // for type conversion. + boolean configIndexName = false; + String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class); + if (indexName == null) { + message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName()); + configIndexName = true; + } + + boolean configIndexType = false; + String indexType = message.getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class); + if (indexType == null) { + message.setHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, configuration.getIndexType()); + configIndexType = true; + } + + boolean configWaitForActiveShards = false; + Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class); + if (waitForActiveShards == null) { + message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards()); + configWaitForActiveShards = true; + } + + if (operation == ElasticsearchOperation.Index) { + IndexRequest indexRequest = ElasticsearchActionRequestConverter.toIndexRequest(message.getBody(), exchange); + message.setBody(restHighLevelClient.index(indexRequest).getId()); + } else if (operation == ElasticsearchOperation.Update) { + UpdateRequest updateRequest = ElasticsearchActionRequestConverter.toUpdateRequest(message.getBody(Map.class), exchange); + message.setBody(restHighLevelClient.update(updateRequest).getId()); + } else if (operation == ElasticsearchOperation.GetById) { + GetRequest getRequest = ElasticsearchActionRequestConverter.toGetRequest(message.getBody(), exchange); + message.setBody(restHighLevelClient.get(getRequest)); + } else if (operation == ElasticsearchOperation.Bulk) { + BulkRequest bulkRequest = message.getBody(BulkRequest.class); + message.setBody(restHighLevelClient.bulk(bulkRequest).getItems()); + } else if (operation == ElasticsearchOperation.BulkIndex) { + BulkRequest bulkRequest = ElasticsearchActionRequestConverter.toBulkRequest(message.getBody(), exchange); + List<String> indexedIds = Arrays.stream(restHighLevelClient.bulk(bulkRequest).getItems()) + .map(BulkItemResponse::getId) + .collect(Collectors.toList()); + message.setBody(indexedIds); + } else if (operation == ElasticsearchOperation.Delete) { + DeleteRequest deleteRequest = ElasticsearchActionRequestConverter.toDeleteRequest(message.getBody(), exchange); + message.setBody(restHighLevelClient.delete(deleteRequest).getResult()); + } else if (operation == ElasticsearchOperation.DeleteIndex) { + DeleteRequest deleteRequest = ElasticsearchActionRequestConverter.toDeleteRequest(message.getBody(), exchange); + message.setBody(client.performRequest("Delete", deleteRequest.index()).getStatusLine().getStatusCode()); + } else if (operation == ElasticsearchOperation.Exists) { + // ExistsRequest API is deprecated, using SearchRequest instead with size=0 and terminate_after=1 + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(0); + sourceBuilder.terminateAfter(1); + SearchRequest searchRequest = new SearchRequest(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class)); + searchRequest.source(sourceBuilder); + try { + restHighLevelClient.search(searchRequest); + message.setBody(true); + } catch (ElasticsearchStatusException e) { + if (e.status().equals(RestStatus.NOT_FOUND)) { + message.setBody(false); + } else { + throw new IllegalStateException(e); + } + + } + } else if (operation == ElasticsearchOperation.Search) { + SearchRequest searchRequest = ElasticsearchActionRequestConverter.toSearchRequest(message.getBody(), exchange); + message.setBody(restHighLevelClient.search(searchRequest).getHits()); + } else { + throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported"); + } + + // If we set params via the configuration on this exchange, remove them + // now. This preserves legacy behavior for this component and enables a + // use case where one message can be sent to multiple elasticsearch + // endpoints where the user is relying on the endpoint configuration + // (index/type) rather than header values. If we do not clear this out + // sending the same message (index request, for example) to multiple + // elasticsearch endpoints would have the effect overriding any + // subsequent endpoint index/type with the first endpoint index/type. + if (configIndexName) { + message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME); + } + + if (configIndexType) { + message.removeHeader(ElasticsearchConstants.PARAM_INDEX_TYPE); + } + + if (configWaitForActiveShards) { + message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS); + } + if (configuration.getDisconnect()) { + IOHelper.close(client); + client = null; + if (configuration.getEnableSniffer()) { + IOHelper.close(sniffer); + sniffer = null; + } + } + + } + + @Override + @SuppressWarnings("unchecked") + protected void doStart() throws Exception { + super.doStart(); + if (!configuration.getDisconnect()) { + startClient(); + } + } + + private void startClient() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, UnknownHostException { + if (client == null) { + LOG.info("Connecting to the ElasticSearch cluster: " + configuration.getClusterName()); + if (configuration.getHostAddressesList() != null + && !configuration.getHostAddressesList().isEmpty()) { + client = createClient(); + } else { + LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster"); + } + } + } + + private RestClient createClient() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + final RestClientBuilder builder = RestClient.builder(configuration.getHostAddressesList().toArray(new HttpHost[0])); + builder.setMaxRetryTimeoutMillis(configuration.getMaxRetryTimeout()); + builder.setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder.setConnectTimeout(configuration.getConnectionTimeout()).setSocketTimeout(configuration.getSocketTimeout())); + if (configuration.getUser() != null && configuration.getPassword() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(configuration.getUser(), configuration.getPassword())); + builder.setHttpClientConfigCallback(httpClientBuilder -> { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + return httpClientBuilder; + }); + } + final RestClient restClient = builder.build(); + if (configuration.getEnableSniffer()) { + SnifferBuilder snifferBuilder = Sniffer.builder(restClient); + snifferBuilder.setSniffIntervalMillis(configuration.getSnifferInterval()); + snifferBuilder.setSniffAfterFailureDelayMillis(configuration.getSniffAfterFailureDelay()); + sniffer = snifferBuilder.build(); + } + return restClient; + } + + + @Override + protected void doStop() throws Exception { + if (client != null) { + LOG.info("Disconnecting from ElasticSearch cluster: {}", configuration.getClusterName()); + client.close(); + if (sniffer != null) { + sniffer.close(); + } + } + super.doStop(); + } + + public RestClient getClient() { + return client; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java new file mode 100644 index 0000000..9f60eb4 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5.aggregation; + +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadRuntimeException; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; + +/** + * Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}. + */ +public class BulkRequestAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter. + Object objBody = newExchange.getIn().getBody(); + if (!(objBody instanceof DocWriteRequest[])) { + throw new InvalidPayloadRuntimeException(newExchange, DocWriteRequest[].class); + } + + DocWriteRequest[] newBody = (DocWriteRequest[]) objBody; + BulkRequest request; + if (oldExchange == null) { + request = new BulkRequest(); + request.add(newBody); + newExchange.getIn().setBody(request); + return newExchange; + } else { + request = oldExchange.getIn().getBody(BulkRequest.class); + request.add(newBody); + return oldExchange; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java new file mode 100644 index 0000000..bec9583 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5.converter; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.component.elasticsearch5.ElasticsearchConstants; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ElasticsearchActionRequestConverter { + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchActionRequestConverter.class); + + private static final String ES_QUERY_DSL_PREFIX = "query"; + private static final String PARENT = "parent"; + + private ElasticsearchActionRequestConverter() { + } + + // Update requests + private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) { + if (document instanceof UpdateRequest) { + return (UpdateRequest) document; + } + UpdateRequest updateRequest = new UpdateRequest(); + if (document instanceof byte[]) { + updateRequest.doc((byte[]) document); + } else if (document instanceof Map) { + updateRequest.doc((Map<String, Object>) document); + } else if (document instanceof String) { + updateRequest.doc((String) document); + } else if (document instanceof XContentBuilder) { + updateRequest.doc((XContentBuilder) document); + } else { + return null; + } + + return updateRequest + .waitForActiveShards(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class)) + .parent(exchange.getIn().getHeader( + PARENT, String.class)) + .index(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)) + .id(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_ID, String.class)); + } + + // Index requests + private static IndexRequest createIndexRequest(Object document, Exchange exchange) { + if (document instanceof IndexRequest) { + return (IndexRequest) document; + } + IndexRequest indexRequest = new IndexRequest(); + if (document instanceof byte[]) { + indexRequest.source((byte[]) document, XContentFactory.xContentType((byte[]) document)); + } else if (document instanceof Map) { + indexRequest.source((Map<String, Object>) document); + } else if (document instanceof String) { + indexRequest.source((String) document, XContentFactory.xContentType((String) document)); + } else if (document instanceof XContentBuilder) { + indexRequest.source((XContentBuilder) document); + } else { + return null; + } + + return indexRequest + .waitForActiveShards(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class)) + .parent(exchange.getIn().getHeader( + PARENT, String.class)) + .index(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)); + } + + public static IndexRequest toIndexRequest(Object document, Exchange exchange) { + return createIndexRequest(document, exchange) + .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); + } + + public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) { + return createUpdateRequest(document, exchange) + .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); + } + + public static GetRequest toGetRequest(Object document, Exchange exchange) { + if (document instanceof GetRequest) { + return (GetRequest) document; + } + return new GetRequest(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_TYPE, + String.class)).id((String) document); + } + + public static DeleteRequest toDeleteRequest(Object document, Exchange exchange) { + if (document instanceof DeleteRequest) { + return (DeleteRequest) document; + } + if (document instanceof String) { + return new DeleteRequest() + .index(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_NAME, + String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_TYPE, + String.class)).id((String) document); + } else { + throw new IllegalArgumentException("Wrong body type. Only DeleteRequest or String is allowed as a type"); + } + } + + public static SearchRequest toSearchRequest(Object queryObject, Exchange exchange) throws IOException { + SearchRequest searchRequest = new SearchRequest(exchange.getIn() + .getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) + .types(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + String queryText = null; + + if (queryObject instanceof Map<?, ?>) { + Map<String, Object> mapQuery = (Map<String, Object>) queryObject; + // Remove 'query' prefix from the query object for backward compatibility + if (mapQuery.containsKey(ES_QUERY_DSL_PREFIX)) { + mapQuery = (Map<String, Object>) mapQuery.get(ES_QUERY_DSL_PREFIX); + } + try { + XContentBuilder contentBuilder = XContentFactory.contentBuilder(XContentType.JSON); + queryText = contentBuilder.map(mapQuery).string(); + } catch (IOException e) { + LOG.error(e.getMessage()); + } + } else if (queryObject instanceof String) { + queryText = (String) queryObject; + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonTextObject = mapper.readValue(queryText, JsonNode.class); + JsonNode parentJsonNode = jsonTextObject.get(ES_QUERY_DSL_PREFIX); + if (parentJsonNode != null) { + queryText = parentJsonNode.toString(); + } + } else { + // Cannot convert the queryObject into SearchRequest + return null; + } + + searchSourceBuilder.query(QueryBuilders.wrapperQuery(queryText)); + searchRequest.source(searchSourceBuilder); + + return searchRequest; + } + + public static BulkRequest toBulkRequest(Object documents, Exchange exchange) { + if (documents instanceof BulkRequest) { + return (BulkRequest) documents; + } + if (documents instanceof List) { + BulkRequest request = new BulkRequest(); + for (Object document : (List<Object>) documents) { + request.add(createIndexRequest(document, exchange)); + } + return request; + } else { + throw new IllegalArgumentException("Wrong body type. Only BulkRequest or List is allowed as a type"); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest b/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest new file mode 100644 index 0000000..13cc909 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.elasticsearch5.ElasticsearchComponent \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java new file mode 100644 index 0000000..f0496e5 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.http.HttpHost; +import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner; +import org.elasticsearch.client.RestClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.newConfigs; + +public class ElasticsearchBaseTest extends CamelTestSupport { + + + public static ElasticsearchClusterRunner runner; + public static String clusterName; + public static RestClient client; + + protected static final int ES_BASE_TRANSPORT_PORT = AvailablePortFinder.getNextAvailable(); + protected static final int ES_BASE_HTTP_PORT = AvailablePortFinder.getNextAvailable(ES_BASE_TRANSPORT_PORT+1); + + @SuppressWarnings("resource") + @BeforeClass + public static void cleanupOnce() throws Exception { + deleteDirectory("target/testcluster/"); + clusterName = "es-cl-run-" + System.currentTimeMillis(); + + runner = new ElasticsearchClusterRunner(); + // create ES nodes + runner.onBuild((number, settingsBuilder) -> { + settingsBuilder.put("http.cors.enabled", true); + settingsBuilder.put("http.cors.allow-origin", "*"); + }).build(newConfigs() + .clusterName(clusterName) + .numOfNode(1) + .baseHttpPort(ES_BASE_TRANSPORT_PORT) + .basePath("target/testcluster/")); + + // wait for green status + runner.ensureGreen(); + client = RestClient.builder(new HttpHost(InetAddress.getByName("localhost"),ES_BASE_HTTP_PORT)).build(); + } + + @AfterClass + public static void teardownOnce() throws IOException { + if (client != null) { + client.close(); + } + if (runner != null) { + runner.close(); + } + } + + @Override + public boolean isCreateCamelContextPerClass() { + // let's speed up the tests using the same context + return true; + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + final ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent(); + elasticsearchComponent.setHostAddresses("localhost:"+ES_BASE_HTTP_PORT); + context.addComponent("elasticsearch5-rest", elasticsearchComponent); + return context; + } + + /** + * As we don't delete the {@code target/data} folder for <b>each</b> test + * below (otherwise they would run much slower), we need to make sure + * there's no side effect of the same used data through creating unique + * indexes. + */ + Map<String, String> createIndexedData(String... additionalPrefixes) { + String prefix = createPrefix(); + + // take over any potential prefixes we may have been asked for + if (additionalPrefixes.length > 0) { + StringBuilder sb = new StringBuilder(prefix); + for (String additionalPrefix : additionalPrefixes) { + sb.append(additionalPrefix).append("-"); + } + prefix = sb.toString(); + } + + String key = prefix + "key"; + String value = prefix + "value"; + log.info("Creating indexed data using the key/value pair {} => {}", key, value); + + Map<String, String> map = new HashMap<String, String>(); + map.put(key, value); + return map; + } + + String createPrefix() { + // make use of the test method name to avoid collision + return getTestMethodName().toLowerCase() + "-"; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java new file mode 100644 index 0000000..979c71c --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.camel.builder.RouteBuilder; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.notNullValue; + +public class ElasticsearchBulkTest extends ElasticsearchBaseTest { + + @Test + public void testBulkIndex() throws Exception { + List<Map<String, String>> documents = new ArrayList<Map<String, String>>(); + Map<String, String> document1 = createIndexedData("1"); + Map<String, String> document2 = createIndexedData("2"); + + documents.add(document1); + documents.add(document2); + + List<?> indexIds = template.requestBody("direct:bulk_index", documents, List.class); + assertNotNull("indexIds should be set", indexIds); + assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size()); + } + + @Test + public void bulkIndexListRequestBody() throws Exception { + String prefix = createPrefix(); + + // given + List<Map<String, String>> request = new ArrayList<>(); + final HashMap<String, String> valueMap = new HashMap<>(); + valueMap.put("id",prefix+"baz"); + valueMap.put("content",prefix + "hello"); + request.add(valueMap); + // when + @SuppressWarnings("unchecked") + List<String> indexedDocumentIds = template.requestBody("direct:bulk_index", request, List.class); + + // then + assertThat(indexedDocumentIds, notNullValue()); + assertThat(indexedDocumentIds.size(), equalTo(1)); + } + + @Test + public void bulkIndexRequestBody() throws Exception { + String prefix = createPrefix(); + + // given + BulkRequest request = new BulkRequest(); + request.add(new IndexRequest(prefix + "foo", prefix + "bar", prefix + "baz").source("{\"" + prefix + "content\": \"" + prefix + "hello\"}")); + + // when + @SuppressWarnings("unchecked") + List<String> indexedDocumentIds = template.requestBody("direct:bulk_index", request, List.class); + + // then + assertThat(indexedDocumentIds, notNullValue()); + assertThat(indexedDocumentIds.size(), equalTo(1)); + assertThat(indexedDocumentIds, hasItem(prefix + "baz")); + } + + @Test + public void bulkRequestBody() throws Exception { + String prefix = createPrefix(); + + // given + BulkRequest request = new BulkRequest(); + request.add(new IndexRequest(prefix + "foo", prefix + "bar", prefix + "baz").source("{\"" + prefix + "content\": \"" + prefix + "hello\"}")); + + // when + BulkItemResponse[] response = (BulkItemResponse[]) template.requestBody("direct:bulk", request); + + // then + assertThat(response, notNullValue()); + assertEquals(prefix + "baz", response[0].getResponse().getId()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:bulk_index").to("elasticsearch5-rest://elasticsearch?operation=BulkIndex&indexName=twitter&indexType=tweet"); + from("direct:bulk").to("elasticsearch5-rest://elasticsearch?operation=Bulk&indexName=twitter&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java new file mode 100644 index 0000000..6ecd3f9 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.http.HttpHost; +import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.newConfigs; + +public class ElasticsearchClusterBaseTest extends CamelTestSupport { + + public static ElasticsearchClusterRunner runner; + public static String clusterName; + public static RestClient restclient; + public static RestHighLevelClient client; + + protected static final int ES_BASE_HTTP_PORT = AvailablePortFinder.getNextAvailable(); + protected static final int ES_FIRST_NODE_TRANSPORT_PORT = AvailablePortFinder.getNextAvailable(ES_BASE_HTTP_PORT + 1); + + @SuppressWarnings("resource") + @BeforeClass + public static void cleanUpOnce() throws Exception { + deleteDirectory("target/testcluster/"); + clusterName = "es-cl-run-" + System.currentTimeMillis(); + // create runner instance + + runner = new ElasticsearchClusterRunner(); + // create ES nodes + runner.onBuild((number, settingsBuilder) -> { + settingsBuilder.put("http.cors.enabled", true); + settingsBuilder.put("http.cors.allow-origin", "*"); + settingsBuilder.put("discovery.zen.ping.unicast.hosts", "127.0.0.1:9301,127.0.0.1:9302,127.0.0.1:9303"); + }).build(newConfigs() + .clusterName(clusterName) + .numOfNode(3) + .baseHttpPort(ES_BASE_HTTP_PORT) + .basePath("target/testcluster/") + .disableESLogger()); + + // wait for green status + runner.ensureGreen(); + restclient = RestClient.builder(new HttpHost(InetAddress.getByName("localhost"),ES_FIRST_NODE_TRANSPORT_PORT)).build(); + client = new RestHighLevelClient(restclient); + } + + @AfterClass + public static void teardownOnce() throws Exception { + if (restclient != null) { + restclient.close(); + } + if (runner != null) { + // close runner + runner.close(); + // delete all files + runner.clean(); + } + } + + @Override + public boolean isCreateCamelContextPerClass() { + // let's speed up the tests using the same context + return true; + } + + /** + * As we don't delete the {@code target/data} folder for <b>each</b> test + * below (otherwise they would run much slower), we need to make sure + * there's no side effect of the same used data through creating unique + * indexes. + */ + Map<String, String> createIndexedData(String... additionalPrefixes) { + String prefix = createPrefix(); + + // take over any potential prefixes we may have been asked for + if (additionalPrefixes.length > 0) { + StringBuilder sb = new StringBuilder(prefix); + for (String additionalPrefix : additionalPrefixes) { + sb.append(additionalPrefix).append("-"); + } + prefix = sb.toString(); + } + + String key = prefix + "key"; + String value = prefix + "value"; + log.info("Creating indexed data using the key/value pair {} => {}", key, value); + + Map<String, String> map = new HashMap<>(); + map.put(key, value); + return map; + } + + String createPrefix() { + // make use of the test method name to avoid collision + return getTestMethodName().toLowerCase() + "-"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java new file mode 100644 index 0000000..b0430d0 --- /dev/null +++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch5; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.http.impl.client.BasicResponseHandler; +import org.elasticsearch.action.get.GetRequest; +import org.junit.Test; + +public class ElasticsearchClusterIndexTest extends ElasticsearchClusterBaseTest { + + @Test + public void indexWithIpAndPort() throws Exception { + Map<String, String> map = createIndexedData(); + Map<String, Object> headers = new HashMap<>(); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "1"); + + String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + + headers.clear(); + + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "status"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "2"); + + indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + + assertEquals("Cluster must be of three nodes", runner.getNodeSize(), 3); + assertEquals("Index id 1 must exists", true, client.get(new GetRequest("twitter", "tweet", "1")).isExists()); + assertEquals("Index id 2 must exists", true, client.get(new GetRequest("twitter", "status", "2")).isExists()); + } + + @Test + public void indexWithSnifferEnable() throws Exception { + Map<String, String> map = createIndexedData(); + Map<String, Object> headers = new HashMap<>(); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "facebook"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "post"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "4"); + + String indexId = template.requestBodyAndHeaders("direct:indexWithSniffer", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + + assertEquals("Cluster must be of three nodes", runner.getNodeSize(), 3); + assertEquals("Index id 4 must exists", true, client.get(new GetRequest("facebook", "post", "4")).isExists()); + + final BasicResponseHandler responseHandler = new BasicResponseHandler(); + String body = responseHandler.handleEntity(restclient.performRequest("GET", "/_cluster/health?pretty").getEntity()); + assertStringContains(body,"\"number_of_data_nodes\" : 3"); + + } + + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:indexWithIpAndPort") + .to("elasticsearch5-rest://" + clusterName + "?operation=Index&indexName=twitter&indexType=tweet&hostAddresses=localhost:" + ES_FIRST_NODE_TRANSPORT_PORT); + from("direct:indexWithSniffer") + .to("elasticsearch5-rest://" + clusterName + "?operation=Index&indexName=twitter&indexType=tweet&enableSniffer=true&hostAddresses=localhost:" + ES_FIRST_NODE_TRANSPORT_PORT); + } + }; + } +}
