Repository: incubator-streams-examples Updated Branches: refs/heads/master b3429dd4b -> 6e93a8f7a
resolve STREAMS-408 resolve STREAMS-408 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4d5d9b0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4d5d9b0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4d5d9b0b Branch: refs/heads/master Commit: 4d5d9b0ba189b4fecda0e1f27acfb55b4fff1165 Parents: b3429dd Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Mon Oct 3 16:51:47 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Mon Oct 3 16:51:47 2016 -0500 ---------------------------------------------------------------------- .../src/site/markdown/ElasticsearchHdfs.md | 2 +- .../src/site/markdown/HdfsElasticsearch.md | 2 +- .../src/site/markdown/index.md | 2 +- .../example/MongoElasticsearchSync.java | 79 ++++++++++++++++++ .../elasticsearch/MongoElasticsearchSync.java | 84 -------------------- .../test/MongoElasticsearchSyncIT.java | 10 +-- 6 files changed, 83 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4d5d9b0b/local/elasticsearch-hdfs/src/site/markdown/ElasticsearchHdfs.md ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/site/markdown/ElasticsearchHdfs.md b/local/elasticsearch-hdfs/src/site/markdown/ElasticsearchHdfs.md index e524e2d..6db4329 100644 --- a/local/elasticsearch-hdfs/src/site/markdown/ElasticsearchHdfs.md +++ b/local/elasticsearch-hdfs/src/site/markdown/ElasticsearchHdfs.md @@ -24,7 +24,7 @@ Example Configuration: Run (Local): ------------ - java -cp dist/elasticsearch-hdfs-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.example.elasticsearch.ElasticsearchHdfs + java -cp dist/elasticsearch-hdfs-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.elasticsearch.example.ElasticsearchHdfs Run (Docker): ------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4d5d9b0b/local/elasticsearch-hdfs/src/site/markdown/HdfsElasticsearch.md ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/site/markdown/HdfsElasticsearch.md b/local/elasticsearch-hdfs/src/site/markdown/HdfsElasticsearch.md index f3778fc..2f90e44 100644 --- a/local/elasticsearch-hdfs/src/site/markdown/HdfsElasticsearch.md +++ b/local/elasticsearch-hdfs/src/site/markdown/HdfsElasticsearch.md @@ -24,7 +24,7 @@ Example Configuration: Run (Local): ------------ - java -cp dist/elasticsearch-hdfs-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.example.elasticsearch.HdfsElasticsearch + java -cp dist/elasticsearch-hdfs-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.elasticsearch.example.HdfsElasticsearch Run (Docker): ------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4d5d9b0b/local/elasticsearch-reindex/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/src/site/markdown/index.md b/local/elasticsearch-reindex/src/site/markdown/index.md index 83eb123..d573137 100644 --- a/local/elasticsearch-reindex/src/site/markdown/index.md +++ b/local/elasticsearch-reindex/src/site/markdown/index.md @@ -36,7 +36,7 @@ Build: Run (Local): ------------ - java -cp dist/elasticsearch-reindex-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.example.elasticsearch.ElasticsearchReindex + java -cp dist/elasticsearch-reindex-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.elasticsearch.example.ElasticsearchReindex Deploy (Docker): ---------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4d5d9b0b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/elasticsearch/example/MongoElasticsearchSync.java ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/elasticsearch/example/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/elasticsearch/example/MongoElasticsearchSync.java new file mode 100644 index 0000000..f77ecce --- /dev/null +++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/elasticsearch/example/MongoElasticsearchSync.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.elasticsearch.example; + +import com.google.common.collect.Maps; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.*; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.example.elasticsearch.MongoElasticsearchSyncConfiguration; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.apache.streams.mongo.MongoPersistReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Copies documents into a new index + */ +public class MongoElasticsearchSync implements Runnable { + + public final static String STREAMS_ID = "MongoElasticsearchSync"; + + private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class); + + MongoElasticsearchSyncConfiguration config; + + public MongoElasticsearchSync() { + this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + } + + public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) { + this.config = config; + } + + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); + + MongoElasticsearchSync sync = new MongoElasticsearchSync(); + + new Thread(sync).start(); + + } + + @Override + public void run() { + + MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource()); + + ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); + StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); + + builder.newPerpetualStream(MongoPersistReader.STREAMS_ID, mongoPersistReader); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, MongoPersistReader.STREAMS_ID); + builder.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4d5d9b0b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java deleted file mode 100644 index fccbf47..0000000 --- a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.elasticsearch; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.*; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.apache.streams.mongo.MongoPersistReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.*; - -/** - * Copies documents into a new index - */ -public class MongoElasticsearchSync implements Runnable { - - public final static String STREAMS_ID = "MongoElasticsearchSync"; - - private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class); - - MongoElasticsearchSyncConfiguration config; - - public MongoElasticsearchSync() { - this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - } - - public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) { - this.config = config; - } - - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); - - MongoElasticsearchSync sync = new MongoElasticsearchSync(); - - new Thread(sync).start(); - - } - - @Override - public void run() { - - MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource()); - - ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); - - Map<String, Object> streamConfig = Maps.newHashMap(); - streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID); - streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); - StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); - - builder.newPerpetualStream(MongoPersistReader.STREAMS_ID, mongoPersistReader); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, MongoPersistReader.STREAMS_ID); - builder.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4d5d9b0b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java index df6fa00..50f9c4c 100644 --- a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java +++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/elasticsearch/test/MongoElasticsearchSyncIT.java @@ -19,20 +19,12 @@ package org.apache.streams.example.elasticsearch.test; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.ElasticsearchConfiguration; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.example.elasticsearch.MongoElasticsearchSync; +import org.apache.streams.elasticsearch.example.MongoElasticsearchSync; import org.apache.streams.example.elasticsearch.MongoElasticsearchSyncConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.mongo.MongoConfiguration; -import org.apache.streams.mongo.MongoPersistReader; import org.apache.streams.mongo.MongoPersistWriter; import org.apache.streams.pojo.json.Activity; import org.elasticsearch.test.ElasticsearchIntegrationTest;
