This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cb7eb725fe Optimize flink oracle cdc, add flink read es to doris
sample code (#11274)
cb7eb725fe is described below
commit cb7eb725fe2fb197200a1bcc19fd180f566af907
Author: caoliang-web <[email protected]>
AuthorDate: Fri Jul 29 18:11:18 2022 +0800
Optimize flink oracle cdc, add flink read es to doris sample code (#11274)
---
samples/doris-demo/flink-demo/pom.xml | 16 ++
.../doris/demo/flink/cdc/FlinkOracleCdcDemo.java | 4 -
.../flink/elasticsearch/ElasticsearchInput.java | 248 +++++++++++++++++++++
.../flink/elasticsearch/FlinkReadEs2Doris.java | 100 +++++++++
4 files changed, 364 insertions(+), 4 deletions(-)
diff --git a/samples/doris-demo/flink-demo/pom.xml
b/samples/doris-demo/flink-demo/pom.xml
index 0f662acc41..586a86c68d 100644
--- a/samples/doris-demo/flink-demo/pom.xml
+++ b/samples/doris-demo/flink-demo/pom.xml
@@ -118,6 +118,22 @@ under the License.
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.1.1</version>
</dependency>
+ <!-- elasticsearch -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.12</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java
b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java
index 5a1045a029..92350a14f3 100644
---
a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java
+++
b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java
@@ -104,10 +104,6 @@ public class FlinkOracleCdcDemo {
LogicalType[] types={new IntType(),new VarCharType(),new
VarCharType(), new DoubleType()};
- Properties pro = new Properties();
- pro.setProperty("format", "json");
- pro.setProperty("strip_outer_array", "false");
-
map.addSink(
DorisSink.sink(
fields,
diff --git
a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/ElasticsearchInput.java
b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/ElasticsearchInput.java
new file mode 100644
index 0000000000..33c9567b53
--- /dev/null
+++
b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/ElasticsearchInput.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.elasticsearch;
+
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * custom elasticsearch source
+ */
+
+public class ElasticsearchInput extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable<Row> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchInput.class);
+
+ private final List<HttpHost> httpHosts;
+ private final RestClientFactory restClientFactory;
+ private final String index;
+
+ private transient RestHighLevelClient client;
+
+ private String scrollId;
+ private SearchRequest searchRequest;
+
+ private RowTypeInfo rowTypeInfo;
+
+ private boolean hasNext;
+
+ private Iterator<Map<String, Object>> iterator;
+ private Map<String, Integer> position;
+
+
+ public ElasticsearchInput(List<HttpHost> httpHosts,
+ RestClientFactory restClientFactory, String
index) {
+ this.httpHosts = httpHosts;
+ this.restClientFactory = restClientFactory;
+ this.index = index;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics)
throws IOException {
+ return cachedStatistics;
+ }
+
+ @Override
+ public InputSplit[] createInputSplits(int minNumSplits) throws IOException
{
+ return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void open(InputSplit split) throws IOException {
+ search();
+ }
+
+ // get data from es
+ protected void search() throws IOException{
+ SearchResponse searchResponse;
+ if(scrollId == null){
+ searchResponse = client.search(searchRequest,
RequestOptions.DEFAULT);
+ scrollId = searchResponse.getScrollId();
+ }else{
+ searchResponse = client.searchScroll(new
SearchScrollRequest(scrollId),RequestOptions.DEFAULT);
+ }
+
+ if(searchResponse == null ||
searchResponse.getHits().getTotalHits().value < 1){
+ hasNext = false;
+ return;
+ }
+
+ hasNext = true;
+ iterator = Arrays.stream(searchResponse.getHits().getHits())
+ .map(t -> t.getSourceAsMap())
+ .collect(Collectors.toList()).iterator();
+ }
+
+
+
+ @Override
+ public void openInputFormat() throws IOException {
+ RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new
HttpHost[httpHosts.size()]));
+ restClientFactory.configureRestClientBuilder(builder);
+ client = new RestHighLevelClient(builder);
+
+ position = new CaseInsensitiveMap();
+ int i = 0;
+ for(String name : rowTypeInfo.getFieldNames()){
+ position.put(name, i++);
+ }
+
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.matchAllQuery());
+ //The test data is relatively small, so the commonly used ones here
are relatively small
+ searchSourceBuilder.size(50);
+
+ searchSourceBuilder.fetchSource(rowTypeInfo.getFieldNames(), null);
+
+ // Get data using scroll api
+ Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3l));
+
+ searchRequest = new SearchRequest();
+ searchRequest.indices(index);
+ searchRequest.scroll(scroll);
+ searchRequest.source(searchSourceBuilder);
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ @Override
+ public Row nextRecord(Row reuse) throws IOException {
+ if(!hasNext) return null;
+
+ if(!iterator.hasNext()){
+ this.search();
+ if(!hasNext || !iterator.hasNext()){
+ hasNext = false;
+ return null;
+ }
+ }
+
+ for(Map.Entry<String, Object> entry: iterator.next().entrySet()){
+ Integer p = position.get(entry.getKey());
+ if(p == null) throw new IOException("unknown field
"+entry.getKey());
+
+ reuse.setField(p, entry.getValue());
+ }
+
+ return reuse;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(client == null)
+ return;
+
+ iterator = null;
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(scrollId);
+ try {
+
client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT).isSucceeded();
+ client.close();
+ client = null;
+ } catch (Exception exception) {
+ if(!(exception.getMessage()).contains("Unable to parse response
body")){
+ throw exception;
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return rowTypeInfo;
+ }
+
+ public static Builder builder(List<HttpHost> httpHosts, String index){
+ return new Builder(httpHosts, index);
+ }
+
+ @PublicEvolving
+ public static class Builder {
+ private final List<HttpHost> httpHosts;
+ private String index;
+ private RowTypeInfo rowTypeInfo;
+ private RestClientFactory restClientFactory = restClientBuilder -> {
+ };
+
+ public Builder(List<HttpHost> httpHosts, String index) {
+ this.httpHosts = Preconditions.checkNotNull(httpHosts);
+ this.index = index;
+ }
+
+
+ public Builder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
+ this.rowTypeInfo = rowTypeInfo;
+ return this;
+ }
+
+
+ public ElasticsearchInput build() {
+ Preconditions.checkNotNull(this.rowTypeInfo);
+ ElasticsearchInput input = new ElasticsearchInput(httpHosts,
restClientFactory, index);
+ input.rowTypeInfo = this.rowTypeInfo;
+ return input;
+ }
+
+ }
+}
diff --git
a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/FlinkReadEs2Doris.java
b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/FlinkReadEs2Doris.java
new file mode 100644
index 0000000000..4d47fd9b19
--- /dev/null
+++
b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/FlinkReadEs2Doris.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.elasticsearch;
+
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.cfg.DorisSink;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.http.HttpHost;
+
+import java.util.Properties;
+
+
+/**
+ * flink reads elasticsearch data and synchronizes it to doris
(flink-doris-connector method)
+ */
+public class FlinkReadEs2Doris {
+
+ private static final String host = "127.0.0.1";
+ private static final String index = "commodity";
+
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(
+ new TypeInformation[]{Types.INT, Types.STRING,
Types.STRING,Types.STRING,Types.DOUBLE},
+ new String[]{"commodity_id", "commodity_name",
"create_time","picture_url","price"});
+
+ ElasticsearchInput es = ElasticsearchInput.builder(
+ Lists.newArrayList(new HttpHost(host, 9200)),
+ index)
+ .setRowTypeInfo(rowTypeInfo)
+ .build();
+
+ DataStreamSource<Row> source = env.createInput(es);
+
+ SingleOutputStreamOperator<String> map = source.map(new
MapFunction<Row, String>() {
+ @Override
+ public String map(Row row) throws Exception {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("commodity_id", row.getField(0));
+ jsonObject.put("commodity_name", row.getField(1));
+ jsonObject.put("create_time", row.getField(2));
+ jsonObject.put("picture_url", row.getField(3));
+ jsonObject.put("price", row.getField(4));
+ return jsonObject.toJSONString();
+ }
+ });
+
+ Properties pro = new Properties();
+ pro.setProperty("format", "json");
+ pro.setProperty("strip_outer_array", "true");
+
+ map.addSink(
+ DorisSink.sink(
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setMaxRetries(3)
+ .setStreamLoadProp(pro).build(),
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.test_es")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+
+ env.execute("test_es");
+ }
+
+
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]