Repository: flink Updated Branches: refs/heads/master b452c8bbb -> b5caaef82
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java index bc9bedc..93ac6c8 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,217 +16,53 @@ */ package org.apache.flink.streaming.connectors.elasticsearch2; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.Assert; -import org.junit.ClassRule; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { @Test public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); + runTransportClientTest(); } - @Test(expected = IllegalArgumentException.class) - public void testNullTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - source.addSink(new ElasticsearchSink<>(config, null, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); + @Test + public void testNullTransportClient() throws Exception { + runNullTransportClientTest(); } - node.close(); - } - - @Test(expected = IllegalArgumentException.class) - public void testEmptyTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - source.addSink(new ElasticsearchSink<>(config, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); + @Test + public void testEmptyTransportClient() throws Exception { + runEmptyTransportClientTest(); } - node.close(); - } - - @Test(expected = JobExecutionException.class) + @Test public void testTransportClientFails() throws Exception{ - // this checks whether the TransportClient fails early when there is no cluster to - // connect to. There isn't a similar test for the Node Client version since that - // one will block and wait for a cluster to come online - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch Node Client Test"); + runTransportClientFailsTest(); } - private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < NUM_ELEMENTS && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction); } - private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( + Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { - public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element.f1); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element.f0.toString()) - .source(json); - } + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - @Override - public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } + return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java deleted file mode 100644 index 05760e8..0000000 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SingleOutputStreamOperator<String> source = - env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - /** - * The mapping method. Takes an element from the input data set and transforms - * it into exactly one element. - * - * @param value The input value. - * @return The transformed value - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new ElasticsearchSinkFunction<String>(){ - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java new file mode 100644 index 0000000..8c50847 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchSinkExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){ + @Override + public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + env.execute("Elasticsearch Sink Example"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties index dc20726..2055184 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties @@ -16,12 +16,12 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.target=System.err log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml index 8fc5c8b..a0bd328 100644 --- a/flink-connectors/flink-connector-elasticsearch5/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml @@ -26,7 +26,7 @@ under the License. <parent> <groupId>org.apache.flink</groupId> <artifactId>flink-connectors</artifactId> - <version>1.2-SNAPSHOT</version> + <version>1.3-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -37,7 +37,7 @@ under the License. <!-- Allow users to pass custom connector versions --> <properties> - <elasticsearch.version>5.0.0</elasticsearch.version> + <elasticsearch.version>5.1.2</elasticsearch.version> </properties> <dependencies> @@ -52,27 +52,38 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- Elasticsearch Java Client has been moved to a different module in 5.x --> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Dependency for Elasticsearch 5.x Java Client --> + <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${elasticsearch.version}</version> </dependency> + <!-- + Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making + Log4j2 a strict dependency. The following is added so that the Log4j2 API in + Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible + in the logging implementation preferred. + --> + <dependency> <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <version>2.7</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> + <artifactId>log4j-to-slf4j</artifactId> <version>2.7</version> </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <!-- test dependencies --> <dependency> @@ -81,6 +92,7 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> @@ -88,6 +100,63 @@ under the License. <scope>test</scope> <type>test-jar</type> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- + Including Log4j2 dependencies for tests is required for the + embedded Elasticsearch nodes used in tests to run correctly. + --> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.7</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.7</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <!-- + For the tests, we need to exclude the Log4j2 to slf4j adapter dependency + and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node + used in tests will fail to work. + + In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J, + but for the test builds, we still stick to directly using Log4j2. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12.2</version> + <configuration> + <classpathDependencyExcludes> + <classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude> + </classpathDependencyExcludes> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java deleted file mode 100644 index f7ca499..0000000 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch5; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkProcessor; - -public class BulkProcessorIndexer implements RequestIndexer { - private final BulkProcessor bulkProcessor; - - public BulkProcessorIndexer(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; - } - - @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { - this.bulkProcessor.add(actionRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java new file mode 100644 index 0000000..1389e7d --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x. + */ +public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge { + + private static final long serialVersionUID = -5222683870097809633L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch5ApiCallBridge.class); + + /** + * User-provided transport addresses. + * + * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x. + */ + private final List<InetSocketAddress> transportAddresses; + + Elasticsearch5ApiCallBridge(List<InetSocketAddress> transportAddresses) { + Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty()); + this.transportAddresses = transportAddresses; + } + + @Override + public Client createClient(Map<String, String> clientConfig) { + Settings settings = Settings.builder().put(clientConfig) + .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) + .build(); + + TransportClient transportClient = new PreBuiltTransportClient(settings); + for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + if (transportClient.connectedNodes().isEmpty()) { + throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!"); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes()); + } + + return transportClient; + } + + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (!bulkItemResponse.isFailed()) { + return null; + } else { + return bulkItemResponse.getFailure().getCause(); + } + } + + @Override + public void cleanup() { + // nothing to cleanup + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java index 29c69c4..9107d4e 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java @@ -16,244 +16,61 @@ */ package org.apache.flink.streaming.connectors.elasticsearch5; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.Preconditions; -import org.elasticsearch.action.bulk.BulkItemResponse; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.transport.Netty3Plugin; -import org.elasticsearch.transport.client.PreBuiltTransportClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** - * Sink that emits its input elements in bulk to an Elasticsearch cluster. - * <p> + * Elasticsearch 5.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. + * * <p> - * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating - * {@link TransportClient}. The config keys can be found in the Elasticsearch - * documentation. An important setting is {@code cluster.name}, this should be set to the name - * of the cluster that the sink should emit to. + * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. + * * <p> - * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster - * can be connected to. + * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found + * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name}, + * which should be set to the name of the cluster that the sink should emit to. + * * <p> - * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. * This will buffer elements before sending a request to the cluster. The behaviour of the * {@code BulkProcessor} can be configured using these config keys: * <ul> - * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer - * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer - * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two - * settings in milliseconds + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds * </ul> + * * <p> - * <p> - * You also have to provide an {@link RequestIndexer}. This is used to create an - * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See - * {@link RequestIndexer} for an example. + * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of + * {@link ElasticsearchSinkFunction} for an example. * - * @param <T> Type of the elements emitted by this sink + * @param <T> Type of the elements handled by this sink */ -public class ElasticsearchSink<T> extends RichSinkFunction<T> { - - public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; - public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; - public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); - - /** - * The user specified config map that we forward to Elasticsearch when we create the Client. - */ - private final Map<String, String> esConfig; - - /** - * The user specified config map that we use to configure BulkProcessor. - */ - private final Map<String, String> sinkConfig; - - /** - * The list of nodes that the TransportClient should connect to. This is null if we are using - * an embedded Node to get a Client. - */ - private final List<InetSocketAddress> transportAddresses; - - /** - * The builder that is used to construct an {@link IndexRequest} from the incoming element. - */ - private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; - - /** - * The Client that was either retrieved from a Node or is a TransportClient. - */ - private transient Client client; - - /** - * Bulk processor that was created using the client - */ - private transient BulkProcessor bulkProcessor; - - /** - * Bulk {@link org.elasticsearch.action.ActionRequest} indexer - */ - private transient RequestIndexer requestIndexer; - - /** - * This is set from inside the BulkProcessor listener if there where failures in processing. - */ - private final AtomicBoolean hasFailure = new AtomicBoolean(false); - - /** - * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. - */ - private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); - /** - * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. * - * @param esConfig The map of user settings that are passed when constructing the TransportClient - * @param sinkConfig The map of user settings that are passed when constructing the BulkProcessor - * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} - * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ - public ElasticsearchSink(Map<String, String> esConfig, Map<String, String> sinkConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this.esConfig = esConfig; - this.sinkConfig = sinkConfig; - this.elasticsearchSinkFunction = elasticsearchSinkFunction; - Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0); - this.transportAddresses = transportAddresses; + public ElasticsearchSink(Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction); } - - /** - * Initializes the connection to Elasticsearch by creating a - * {@link org.elasticsearch.client.transport.TransportClient}. - */ - @Override - public void open(Configuration configuration) { - List<TransportAddress> transportNodes; - transportNodes = new ArrayList<>(transportAddresses.size()); - for (InetSocketAddress address : transportAddresses) { - transportNodes.add(new InetSocketTransportAddress(address)); - } - - Settings settings = Settings.builder().put(esConfig) - .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) - .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) - .build(); - - TransportClient transportClient = new PreBuiltTransportClient(settings); - for (TransportAddress transport : transportNodes) { - transportClient.addTransportAddress(transport); - } - - // verify that we actually are connected to a cluster - if (transportClient.connectedNodes().isEmpty()) { - throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); - } - - client = transportClient; - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch TransportClient {}", client); - } - - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); - } - } - hasFailure.set(true); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); - } - }); - - // This makes flush() blocking - bulkProcessorBuilder.setConcurrentRequests(0); - - ParameterTool params = ParameterTool.fromMap(sinkConfig); - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { - bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { - bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( - CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); - } - - bulkProcessor = bulkProcessorBuilder.build(); - requestIndexer = new BulkProcessorIndexer(bulkProcessor); - } - - @Override - public void invoke(T element) { - elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer); - } - - @Override - public void close() { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } - - if (client != null) { - client.close(); - } - - if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occurred in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occurred in ElasticsearchSink."); - } - } - - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java deleted file mode 100644 index 752a83e..0000000 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch5; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.io.Serializable; - -/** - * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream. - * - * <p> - * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch. - * - * <p> - * Example: - * - * <pre>{@code - * private static class TestElasticSearchSinkFunction implements - * ElasticsearchSinkFunction<Tuple2<Integer, String>> { - * - * public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { - * Map<String, Object> json = new HashMap<>(); - * json.put("data", element.f1); - * - * return Requests.indexRequest() - * .index("my-index") - * .type("my-type") - * .id(element.f0.toString()) - * .source(json); - * } - * - * public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { - * indexer.add(createIndexRequest(element)); - * } - * } - * - * }</pre> - * - * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction} - */ -public interface ElasticsearchSinkFunction<T> extends Serializable, Function { - void process(T element, RuntimeContext ctx, RequestIndexer indexer); -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java deleted file mode 100644 index 170df31..0000000 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch5; - -import org.elasticsearch.action.ActionRequest; - -import java.io.Serializable; - -public interface RequestIndexer extends Serializable { - void add(ActionRequest... actionRequests); -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java new file mode 100644 index 0000000..f3d8897 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkITCase; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty3Plugin; + +import java.io.File; +import java.util.Collections; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 5.x. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("http.enabled", false) + .put("path.home", tmpDataFolder.getParent()) + .put("path.data", tmpDataFolder.getAbsolutePath()) + .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) + .build(); + + node = new PluginNode(settings); + node.start(); + } + } + + @Override + public void close() throws Exception { + if (node != null && !node.isClosed()) { + node.close(); + node = null; + } + } + + @Override + public Client getClient() { + if (node != null && !node.isClosed()) { + return node.client(); + } else { + return null; + } + } + + private static class PluginNode extends Node { + public PluginNode(Settings settings) { + super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class)); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java index b4a370b..3ebda52 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,184 +17,54 @@ */ package org.apache.flink.streaming.connectors.elasticsearch5; -import com.google.common.collect.ImmutableMap; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalSettingsPreparer; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.transport.Netty3Plugin; -import org.junit.ClassRule; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { @Test public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Settings settings = Settings.builder() - .put("cluster.name", "my-transport-client-cluster") - .put("http.enabled", false) - .put("path.home", dataDir.getParent()) - .put("path.data", dataDir.getAbsolutePath()) - .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) - .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) - .build(); - - Node node = new PluginNode(settings); - node.start(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); - Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.prepareGet("my-index", "my-type", Integer.toString(i)).get(); - assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); + runTransportClientTest(); } - @Test(expected = IllegalArgumentException.class) + @Test public void testNullTransportClient() throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); - Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, null, new TestElasticsearchSinkFunction())); - - fail(); + runNullTransportClientTest(); } - @Test(expected = IllegalArgumentException.class) + @Test public void testEmptyTransportClient() throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); - Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - fail(); + runEmptyTransportClientTest(); } - @Test(expected = JobExecutionException.class) + @Test public void testTransportClientFails() throws Exception { - // this checks whether the TransportClient fails early when there is no cluster to - // connect to. There isn't a similar test for the Node Client version since that - // one will block and wait for a cluster to come online - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster"); - Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch Node Client Test"); - - fail(); + runTransportClientFailsTest(); } - private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < NUM_ELEMENTS && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction); } - private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element.f1); + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( + Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element.f0.toString()) - .source(json); - } + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - @Override - public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } + return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction); } - private static class PluginNode extends Node { - public PluginNode(Settings settings) { - super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class)); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java deleted file mode 100644 index 47ce846..0000000 --- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch5.examples; - -import com.google.common.collect.ImmutableMap; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch5.RequestIndexer; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SingleOutputStreamOperator<String> source = - env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map<String, String> esConfig = ImmutableMap.of("cluster.name", "elasticsearch"); - - // This instructs the sink to emit after every element, otherwise they would be buffered - Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() { - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java new file mode 100644 index 0000000..4135283 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchSinkExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + env.execute("Elasticsearch Sink Example"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2055184 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties deleted file mode 100644 index dc20726..0000000 --- a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index e19c77f..5d8ca70 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -45,9 +45,9 @@ under the License. <module>flink-connector-kafka-0.8</module> <module>flink-connector-kafka-0.9</module> <module>flink-connector-kafka-0.10</module> + <module>flink-connector-elasticsearch-base</module> <module>flink-connector-elasticsearch</module> <module>flink-connector-elasticsearch2</module> - <module>flink-connector-elasticsearch5</module> <module>flink-connector-rabbitmq</module> <module>flink-connector-twitter</module> <module>flink-connector-nifi</module> @@ -86,6 +86,20 @@ under the License. <module>flink-connector-kinesis</module> </modules> </profile> + + <!-- + Since Elasticsearch 5.x requires Java 8 at a minimum, we use this profile + to include it as part of Java 8 builds only. + --> + <profile> + <id>include-elasticsearch5</id> + <activation> + <jdk>1.8</jdk> + </activation> + <modules> + <module>flink-connector-elasticsearch5</module> + </modules> + </profile> </profiles> </project>
