[FLINK-3511] [nifi, elasticsearch] Move nifi and elasticsearch examples to test scope
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc82408 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc82408 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc82408 Branch: refs/heads/release-1.0 Commit: 0dc824080f38d83d9a748d19d04344c3bf4d7077 Parents: 434cff0 Author: Till Rohrmann <[email protected]> Authored: Fri Feb 26 16:21:13 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Fri Feb 26 20:57:36 2016 +0100 ---------------------------------------------------------------------- .../examples/ElasticsearchExample.java | 81 -------------------- .../examples/ElasticsearchExample.java | 81 ++++++++++++++++++++ .../nifi/examples/NiFiSinkTopologyExample.java | 55 ------------- .../examples/NiFiSourceTopologyExample.java | 58 -------------- .../nifi/examples/NiFiSinkTopologyExample.java | 55 +++++++++++++ .../examples/NiFiSourceTopologyExample.java | 58 ++++++++++++++ 6 files changed, 194 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82408/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java deleted file mode 100644 index f8f658b..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch.examples; - -import com.google.common.collect.Maps; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the cluster name in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - for (int i = 0; i < 20 && running; i++) { - ctx.collect("message #" + i); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - Map<String, String> config = Maps.newHashMap(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { - @Override - public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - })); - - - env.execute("Elasticsearch Example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82408/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java new file mode 100644 index 0000000..f8f658b --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.examples; + +import com.google.common.collect.Maps; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.HashMap; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the cluster name in the config map. + */ +public class ElasticsearchExample { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<String> ctx) throws Exception { + for (int i = 0; i < 20 && running; i++) { + ctx.collect("message #" + i); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + Map<String, String> config = Maps.newHashMap(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { + @Override + public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .source(json); + } + })); + + + env.execute("Elasticsearch Example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82408/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java deleted file mode 100644 index 572f949..0000000 --- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java +++ /dev/null @@ -1,55 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.flink.streaming.connectors.nifi.examples; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket; -import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder; -import org.apache.flink.streaming.connectors.nifi.NiFiSink; -import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; - -import java.util.HashMap; - -/** - * An example topology that sends data to a NiFi input port named "Data from Flink". - */ -public class NiFiSinkTopologyExample { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data from Flink") - .buildConfig(); - - DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q") - .addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() { - @Override - public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) { - return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>()); - } - })); - - env.execute(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82408/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java deleted file mode 100644 index 79c9a1c..0000000 --- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java +++ /dev/null @@ -1,58 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.flink.streaming.connectors.nifi.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket; -import org.apache.flink.streaming.connectors.nifi.NiFiSource; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; - -import java.nio.charset.Charset; - -/** - * An example topology that pulls data from a NiFi output port named "Data for Flink". - */ -public class NiFiSourceTopologyExample { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data for Flink") - .requestBatchCount(5) - .buildConfig(); - - SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig); - DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2); - - DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() { - @Override - public String map(NiFiDataPacket value) throws Exception { - return new String(value.getContent(), Charset.defaultCharset()); - } - }); - - dataStream.print(); - env.execute(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82408/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java new file mode 100644 index 0000000..572f949 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.flink.streaming.connectors.nifi.examples; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket; +import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder; +import org.apache.flink.streaming.connectors.nifi.NiFiSink; +import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import java.util.HashMap; + +/** + * An example topology that sends data to a NiFi input port named "Data from Flink". + */ +public class NiFiSinkTopologyExample { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data from Flink") + .buildConfig(); + + DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q") + .addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() { + @Override + public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) { + return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>()); + } + })); + + env.execute(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82408/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java new file mode 100644 index 0000000..79c9a1c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java @@ -0,0 +1,58 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.flink.streaming.connectors.nifi.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket; +import org.apache.flink.streaming.connectors.nifi.NiFiSource; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import java.nio.charset.Charset; + +/** + * An example topology that pulls data from a NiFi output port named "Data for Flink". + */ +public class NiFiSourceTopologyExample { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data for Flink") + .requestBatchCount(5) + .buildConfig(); + + SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig); + DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2); + + DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() { + @Override + public String map(NiFiDataPacket value) throws Exception { + return new String(value.getContent(), Charset.defaultCharset()); + } + }); + + dataStream.print(); + env.execute(); + } + +}
