normalize package names in streams-examples/local
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/5b96588c Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/5b96588c Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/5b96588c Branch: refs/heads/master Commit: 5b96588c492cfafa1732e5d5a680df8485bdb491 Parents: e949f58 Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Tue Oct 11 16:30:11 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Tue Oct 11 16:39:07 2016 -0500 ---------------------------------------------------------------------- .../example/ElasticsearchHdfs.java | 85 ----- .../example/HdfsElasticsearch.java | 86 ----- .../streams/example/ElasticsearchHdfs.java | 80 +++++ .../streams/example/HdfsElasticsearch.java | 80 +++++ .../ElasticsearchHdfsConfiguration.json | 2 +- .../HdfsElasticsearchConfiguration.json | 2 +- .../elasticsearch/test/ElasticsearchHdfsIT.java | 128 -------- .../example/elasticsearch/test/ExampleITs.java | 17 - .../elasticsearch/test/HdfsElasticsearchIT.java | 132 -------- .../example/test/ElasticsearchHdfsIT.java | 112 +++++++ .../apache/streams/example/test/ExampleITs.java | 17 + .../example/test/HdfsElasticsearchIT.java | 118 +++++++ .../example/ElasticsearchReindex.java | 84 ----- .../streams/example/ElasticsearchReindex.java | 80 +++++ .../ElasticsearchReindexConfiguration.json | 2 +- .../test/ElasticsearchReindexChildIT.java | 121 ------- .../test/ElasticsearchReindexIT.java | 120 ------- .../test/ElasticsearchReindexParentIT.java | 133 -------- .../example/elasticsearch/test/ReindexITs.java | 20 -- .../test/ElasticsearchReindexChildIT.java | 121 +++++++ .../example/test/ElasticsearchReindexIT.java | 120 +++++++ .../test/ElasticsearchReindexParentIT.java | 133 ++++++++ .../apache/streams/example/test/ReindexITs.java | 20 ++ .../example/MongoElasticsearchSync.java | 79 ----- .../streams/example/MongoElasticsearchSync.java | 79 +++++ .../MongoElasticsearchSyncConfiguration.json | 2 +- .../mongodb/test/MongoElasticsearchSyncIT.java | 121 ------- .../streams/example/mongodb/test/SyncITs.java | 16 - .../example/test/MongoElasticsearchSyncIT.java | 117 +++++++ .../apache/streams/example/test/SyncITs.java | 16 + .../src/test/resources/testSync.json | 21 -- local/pom.xml | 5 +- local/twitter-follow-graph/README.md | 8 - local/twitter-follow-graph/pom.xml | 316 ------------------- .../example/graph/TwitterFollowGraph.java | 103 ------ .../TwitterFollowGraphConfiguration.json | 13 - .../src/main/resources/TwitterFollowGraph.dot | 39 --- .../src/site/markdown/index.md | 75 ----- .../twitter/example/TwitterFollowGraphIT.java | 79 ----- .../test/resources/TwitterFollowGraphIT.conf | 28 -- local/twitter-follow-neo4j/README.md | 8 + ...itter-follow-graph-jar-with-dependencies.jar | Bin 0 -> 25829072 bytes local/twitter-follow-neo4j/pom.xml | 316 +++++++++++++++++++ .../streams/example/TwitterFollowNeo4j.java | 93 ++++++ .../TwitterFollowNeo4jConfiguration.json | 13 + .../src/main/resources/TwitterFollowNeo4j.dot | 39 +++ .../src/site/markdown/TwitterFollowNeo4j.md | 33 ++ .../src/site/markdown/index.md | 42 +++ .../src/site/resources/TwitterFollowGraph.dot | 39 +++ .../TwitterFollowGraphConfiguration.json | 13 + .../src/site/resources/TwitterFollowNeo4j.dot | 39 +++ .../TwitterFollowNeo4jConfiguration.json | 13 + local/twitter-follow-neo4j/src/site/site.xml | 45 +++ .../example/test/TwitterFollowNeo4jIT.java | 79 +++++ .../test/resources/TwitterFollowGraphIT.conf | 28 ++ .../example/TwitterHistoryElasticsearch.java | 81 +++++ .../twitter/TwitterHistoryElasticsearch.java | 90 ------ ...witterHistoryElasticsearchConfiguration.json | 2 +- .../test/TwitterHistoryElasticsearchIT.java | 108 +++++++ .../example/TwitterHistoryElasticsearchIT.java | 108 ------- .../example/TwitterUserstreamElasticsearch.java | 146 +++++++++ .../example/TwitterUserstreamElasticsearch.java | 146 --------- ...terUserstreamElasticsearchConfiguration.json | 2 +- .../test/TwitterUserstreamElasticsearchIT.java | 109 +++++++ .../test/TwitterUserstreamElasticsearchIT.java | 111 ------- 65 files changed, 2344 insertions(+), 2289 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java deleted file mode 100644 index da0acbd..0000000 --- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.elasticsearch.example; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.ElasticsearchPersistReader; -import org.apache.streams.hdfs.WebHdfsPersistWriter; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.*; - -/** - * Copies documents into a new index - */ -public class ElasticsearchHdfs implements Runnable { - - public final static String STREAMS_ID = "ElasticsearchHdfs"; - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class); - - ElasticsearchHdfsConfiguration config; - - public ElasticsearchHdfs() { - this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - - } - - public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) { - this.config = reindex; - } - - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); - - ElasticsearchHdfs backup = new ElasticsearchHdfs(); - - new Thread(backup).start(); - - } - - @Override - public void run() { - - ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource()); - - WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination()); - - Map<String, Object> streamConfig = Maps.newHashMap(); - streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); - streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); - StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); - - builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader); - builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID); - builder.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java deleted file mode 100644 index 0a65479..0000000 --- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.elasticsearch.example; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.hdfs.WebHdfsPersistReader; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.math.BigInteger; -import java.util.Map; -import java.util.concurrent.*; - -/** - * Copies documents into a new index - */ -public class HdfsElasticsearch implements Runnable { - - public final static String STREAMS_ID = "HdfsElasticsearch"; - - private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class); - - HdfsElasticsearchConfiguration config; - - public HdfsElasticsearch() { - this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - - } - - public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) { - this.config = reindex; - } - - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); - - HdfsElasticsearch restore = new HdfsElasticsearch(); - - new Thread(restore).start(); - - } - - @Override - public void run() { - - WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource()); - - ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); - - Map<String, Object> streamConfig = Maps.newHashMap(); - streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); - streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000); - StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); - - builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID); - builder.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java new file mode 100644 index 0000000..8d3cf36 --- /dev/null +++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example; + +import com.google.common.collect.Maps; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchPersistReader; +import org.apache.streams.example.ElasticsearchHdfsConfiguration; +import org.apache.streams.hdfs.WebHdfsPersistWriter; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Copies documents into a new index + */ +public class ElasticsearchHdfs implements Runnable { + + public final static String STREAMS_ID = "ElasticsearchHdfs"; + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class); + + ElasticsearchHdfsConfiguration config; + + public ElasticsearchHdfs() { + this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + + } + + public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) { + this.config = reindex; + } + + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); + + ElasticsearchHdfs backup = new ElasticsearchHdfs(); + + new Thread(backup).start(); + + } + + @Override + public void run() { + + ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource()); + + WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination()); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); + StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); + + builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader); + builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID); + builder.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java new file mode 100644 index 0000000..847ac48 --- /dev/null +++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example; + +import com.google.common.collect.Maps; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.example.HdfsElasticsearchConfiguration; +import org.apache.streams.hdfs.WebHdfsPersistReader; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Copies documents into a new index + */ +public class HdfsElasticsearch implements Runnable { + + public final static String STREAMS_ID = "HdfsElasticsearch"; + + private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class); + + HdfsElasticsearchConfiguration config; + + public HdfsElasticsearch() { + this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + + } + + public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) { + this.config = reindex; + } + + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); + + HdfsElasticsearch restore = new HdfsElasticsearch(); + + new Thread(restore).start(); + + } + + @Override + public void run() { + + WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource()); + + ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000); + StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); + + builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID); + builder.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json index 9ad7e54..ee17a3d 100644 --- a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json +++ b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json @@ -4,7 +4,7 @@ "http://www.apache.org/licenses/LICENSE-2.0" ], "type": "object", - "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration", + "javaType" : "org.apache.streams.example.ElasticsearchHdfsConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { "source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true }, http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json index 8b77225..4239279 100644 --- a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json +++ b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json @@ -4,7 +4,7 @@ "http://www.apache.org/licenses/LICENSE-2.0" ], "type": "object", - "javaType" : "org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration", + "javaType" : "org.apache.streams.example.HdfsElasticsearchConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { "source": { "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration", "type": "object", "required": true }, http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java deleted file mode 100644 index 8dfe244..0000000 --- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.elasticsearch.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.ElasticsearchConfiguration; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; -import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.elasticsearch.example.ElasticsearchHdfs; -import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration; -import org.apache.streams.elasticsearch.example.HdfsElasticsearch; -import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertNotEquals; - -/** - * Test copying documents between hdfs and elasticsearch - */ -public class ElasticsearchHdfsIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class); - - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected ElasticsearchHdfsConfiguration testConfiguration; - protected Client testClient; - - private int count = 0; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) - .setTypes(testConfiguration.getSource().getTypes().get(0)); - SearchResponse countResponse = countRequest.execute().actionGet(); - - count = (int)countResponse.getHits().getTotalHits(); - - assertNotEquals(count, 0); - } - - @Test - public void ElasticsearchHdfsIT() throws Exception { - - ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration); - - backup.run(); - - // assert lines in file - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java deleted file mode 100644 index ab882c8..0000000 --- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.streams.example.elasticsearch.test; - -import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) [email protected]({ - ElasticsearchPersistWriterIT.class, - ElasticsearchHdfsIT.class, - HdfsElasticsearchIT.class, -}) - -public class ExampleITs { - // the class remains empty, - // used only as a holder for the above annotations -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java deleted file mode 100644 index 1a055f6..0000000 --- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.elasticsearch.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.ElasticsearchConfiguration; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; -import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.elasticsearch.example.ElasticsearchHdfs; -import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration; -import org.apache.streams.elasticsearch.example.HdfsElasticsearch; -import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -/** - * Test copying documents between hdfs and elasticsearch - */ -public class HdfsElasticsearchIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class); - - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected HdfsElasticsearchConfiguration testConfiguration; - protected Client testClient; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - if(indicesExistsResponse.isExists()) { - DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex()); - DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); - assertTrue(deleteIndexResponse.isAcknowledged()); - }; - } - - @Test - public void ElasticsearchHdfsIT() throws Exception { - - HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration); - - restore.run(); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getDestination().getIndex()) - .setTypes(testConfiguration.getDestination().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); - - assertEquals(89, countResponse.getHits().getTotalHits()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java new file mode 100644 index 0000000..8e87f3a --- /dev/null +++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.example.ElasticsearchHdfs; +import org.apache.streams.example.ElasticsearchHdfsConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertNotEquals; + +/** + * Test copying documents between hdfs and elasticsearch + */ +public class ElasticsearchHdfsIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class); + + ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected ElasticsearchHdfsConfiguration testConfiguration; + protected Client testClient; + + private int count = 0; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe); + testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) + .setTypes(testConfiguration.getSource().getTypes().get(0)); + SearchResponse countResponse = countRequest.execute().actionGet(); + + count = (int)countResponse.getHits().getTotalHits(); + + assertNotEquals(count, 0); + } + + @Test + public void ElasticsearchHdfsIT() throws Exception { + + ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration); + + backup.run(); + + // assert lines in file + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java new file mode 100644 index 0000000..5965914 --- /dev/null +++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java @@ -0,0 +1,17 @@ +package org.apache.streams.example.test; + +import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) [email protected]({ + ElasticsearchPersistWriterIT.class, + ElasticsearchHdfsIT.class, + HdfsElasticsearchIT.class, +}) + +public class ExampleITs { + // the class remains empty, + // used only as a holder for the above annotations +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java new file mode 100644 index 0000000..4eb7fc0 --- /dev/null +++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.example.HdfsElasticsearch; +import org.apache.streams.example.HdfsElasticsearchConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test copying documents between hdfs and elasticsearch + */ +public class HdfsElasticsearchIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class); + + ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected HdfsElasticsearchConfiguration testConfiguration; + protected Client testClient; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe); + testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + if(indicesExistsResponse.isExists()) { + DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex()); + DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + assertTrue(deleteIndexResponse.isAcknowledged()); + }; + } + + @Test + public void ElasticsearchHdfsIT() throws Exception { + + HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration); + + restore.run(); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getDestination().getIndex()) + .setTypes(testConfiguration.getDestination().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); + + assertEquals(89, countResponse.getHits().getTotalHits()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java deleted file mode 100644 index dc94773..0000000 --- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.elasticsearch.example; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.*; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.*; - -/** - * Copies documents into a new index - */ -public class ElasticsearchReindex implements Runnable { - - public final static String STREAMS_ID = "ElasticsearchReindex"; - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class); - - ElasticsearchReindexConfiguration config; - - public ElasticsearchReindex() { - this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - - } - - public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) { - this.config = reindex; - } - - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); - - ElasticsearchReindex reindex = new ElasticsearchReindex(); - - new Thread(reindex).start(); - - } - - @Override - public void run() { - - ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource()); - - ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); - - Map<String, Object> streamConfig = Maps.newHashMap(); - streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); - streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); - StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); - - builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID); - builder.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java new file mode 100644 index 0000000..dfb2a98 --- /dev/null +++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example; + +import com.google.common.collect.Maps; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.elasticsearch.ElasticsearchPersistReader; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.example.ElasticsearchReindexConfiguration; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Copies documents into a new index + */ +public class ElasticsearchReindex implements Runnable { + + public final static String STREAMS_ID = "ElasticsearchReindex"; + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class); + + ElasticsearchReindexConfiguration config; + + public ElasticsearchReindex() { + this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + + } + + public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) { + this.config = reindex; + } + + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); + + ElasticsearchReindex reindex = new ElasticsearchReindex(); + + new Thread(reindex).start(); + + } + + @Override + public void run() { + + ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource()); + + ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); + StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); + + builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID); + builder.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json b/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json index 1237538..09bdf5b 100644 --- a/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json +++ b/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json @@ -4,7 +4,7 @@ "http://www.apache.org/licenses/LICENSE-2.0" ], "type": "object", - "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration", + "javaType" : "org.apache.streams.example.ElasticsearchReindexConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { "source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true }, http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java deleted file mode 100644 index d033014..0000000 --- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.elasticsearch.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.example.ElasticsearchReindex; -import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -/** - * Test copying parent/child associated documents between two indexes on same cluster - */ -public class ElasticsearchReindexChildIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class); - - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected ElasticsearchReindexConfiguration testConfiguration; - protected Client testClient; - - private int count = 0; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) - .setTypes(testConfiguration.getSource().getTypes().get(0)); - SearchResponse countResponse = countRequest.execute().actionGet(); - - count = (int)countResponse.getHits().getTotalHits(); - - assertNotEquals(count, 0); - - } - - @Test - public void testReindex() throws Exception { - - ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration); - - reindex.run(); - - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getDestination().getIndex()) - .setTypes(testConfiguration.getDestination().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); - - assertEquals(count, (int)countResponse.getHits().getTotalHits()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java deleted file mode 100644 index 5854ac0..0000000 --- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.elasticsearch.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.example.ElasticsearchReindex; -import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -/** - * Test copying documents between two indexes on same cluster - */ -public class ElasticsearchReindexIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class); - - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected ElasticsearchReindexConfiguration testConfiguration; - protected Client testClient; - - private int count = 0; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) - .setTypes(testConfiguration.getSource().getTypes().get(0)); - SearchResponse countResponse = countRequest.execute().actionGet(); - - count = (int)countResponse.getHits().getTotalHits(); - - assertNotEquals(count, 0); - - } - - @Test - public void testReindex() throws Exception { - - ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration); - - reindex.run(); - - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getDestination().getIndex()) - .setTypes(testConfiguration.getDestination().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); - - assertEquals(count, (int)countResponse.getHits().getTotalHits()); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java deleted file mode 100644 index 90924e7..0000000 --- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.elasticsearch.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.example.ElasticsearchReindex; -import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration; -import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.URL; -import java.util.Properties; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -/** - * Test copying parent/child associated documents between two indexes on same cluster - */ -public class ElasticsearchReindexParentIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class); - - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected ElasticsearchReindexConfiguration testConfiguration; - protected Client testClient; - - private int count = 0; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) - .setTypes(testConfiguration.getSource().getTypes().get(0)); - SearchResponse countResponse = countRequest.execute().actionGet(); - - count = (int)countResponse.getHits().getTotalHits(); - - PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings"); - URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json"); - ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class); - String templateSource = MAPPER.writeValueAsString(template); - putTemplateRequestBuilder.setSource(templateSource); - - testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet(); - - assertNotEquals(count, 0); - - } - - @Test - public void testReindex() throws Exception { - - ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration); - - reindex.run(); - - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getDestination().getIndex()) - .setTypes(testConfiguration.getDestination().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); - - assertEquals(count, (int)countResponse.getHits().getTotalHits()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java deleted file mode 100644 index 9c46d31..0000000 --- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.apache.streams.example.elasticsearch.test; - -import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT; -import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) [email protected]({ - ElasticsearchPersistWriterIT.class, - ElasticsearchParentChildWriterIT.class, - ElasticsearchReindexIT.class, - ElasticsearchReindexParentIT.class, - ElasticsearchReindexChildIT.class -}) - -public class ReindexITs { - // the class remains empty, - // used only as a holder for the above annotations -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java new file mode 100644 index 0000000..47c8f51 --- /dev/null +++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.example.ElasticsearchReindex; +import org.apache.streams.example.ElasticsearchReindexConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test copying parent/child associated documents between two indexes on same cluster + */ +public class ElasticsearchReindexChildIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class); + + ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected ElasticsearchReindexConfiguration testConfiguration; + protected Client testClient; + + private int count = 0; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe); + testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) + .setTypes(testConfiguration.getSource().getTypes().get(0)); + SearchResponse countResponse = countRequest.execute().actionGet(); + + count = (int)countResponse.getHits().getTotalHits(); + + assertNotEquals(count, 0); + + } + + @Test + public void testReindex() throws Exception { + + ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration); + + reindex.run(); + + // assert lines in file + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getDestination().getIndex()) + .setTypes(testConfiguration.getDestination().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); + + assertEquals(count, (int)countResponse.getHits().getTotalHits()); + + } + +}
