This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 560ae8e1 [ISSUE #346] Add elasticsearch source connector (#347)
560ae8e1 is described below
commit 560ae8e145d7de686467a54cb374e6c4b4e24803
Author: Oliver <[email protected]>
AuthorDate: Mon Oct 31 14:16:59 2022 +0800
[ISSUE #346] Add elasticsearch source connector (#347)
---
.../rocketmq-connect-elasticsearch/README.md | 31 +++
connectors/rocketmq-connect-elasticsearch/pom.xml | 231 +++++++++++++++++++++
.../elasticsearch/config/ElasticsearchConfig.java | 121 +++++++++++
.../config/ElasticsearchConstant.java | 41 ++++
.../connector/ElasticsearchSourceConnector.java | 83 ++++++++
.../connector/ElasticsearchSourceTask.java | 135 ++++++++++++
.../replicator/source/ElasticsearchQuery.java | 154 ++++++++++++++
.../replicator/source/ElasticsearchReplicator.java | 64 ++++++
.../config/ElasticsearchConfigTest.java | 47 +++++
.../ElasticsearchSourceConnectorTest.java | 44 ++++
.../replicator/source/ElasticsearchQueryTest.java | 48 +++++
11 files changed, 999 insertions(+)
diff --git a/connectors/rocketmq-connect-elasticsearch/README.md
b/connectors/rocketmq-connect-elasticsearch/README.md
new file mode 100644
index 00000000..c5c0d89e
--- /dev/null
+++ b/connectors/rocketmq-connect-elasticsearch/README.md
@@ -0,0 +1,31 @@
+##### ElasticsearchSourceConnector fully-qualified name
+org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector
+
+**elasticsearch-source-connector** start
+
+```
+POST
http://${runtime-ip}:${runtime-port}/connectors/elasticsearchSourceConnector
+{
+
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector",
+ "elasticsearchHost":"localhost",
+ "elasticsearchPort":9200,
+ "index":{
+ "aolifu_connect": {
+ "primaryShards":1,
+ "id":1
+ }
+ },
+ "max.tasks":1,
+ "connect.topicname":"configInfo",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.StringConverter",
+
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.StringConverter"
+}
+```
+
+##### parameter configuration
+
+parameter | effect
| required |default
+---|-----------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
---
+elasticsearchHost | The Host of the Elasticsearch server
| yes | null
+elasticsearchPort | The Port of the Elasticsearch server
| yes | null
+index| The info of the index
| yes | null
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-elasticsearch/pom.xml
b/connectors/rocketmq-connect-elasticsearch/pom.xml
new file mode 100644
index 00000000..557a7281
--- /dev/null
+++ b/connectors/rocketmq-connect-elasticsearch/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-elasticsearch</artifactId>
+ <version>1.0.0</version>
+
+ <name>connect-elasticsearch</name>
+
<url>https://github.com/apache/rocketmq-connect/tree/master/connectors/rocketmq-connect-elasticsearch</url>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>README.md</exclude>
+ <exclude>README-CN.md</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.23.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>4.8.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.83</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.2.9</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>7.6.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>7.6.2</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfig.java
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfig.java
new file mode 100644
index 00000000..0f06b162
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticsearchConfig {
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add(ElasticsearchConstant.ES_HOST);
+ add(ElasticsearchConstant.ES_PORT);
+ add(ElasticsearchConstant.INDEX);
+ }
+ };
+
+ private String index;
+
+ private String elasticsearchHost;
+
+ private Integer elasticsearchPort;
+
+ /**
+ * key is indexName, value is field name
+ */
+ private Map<String, String> indexMap = new HashMap<>();
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getElasticsearchHost() {
+ return elasticsearchHost;
+ }
+
+ public void setElasticsearchHost(String elasticsearchHost) {
+ this.elasticsearchHost = elasticsearchHost;
+ }
+
+ public Integer getElasticsearchPort() {
+ return elasticsearchPort;
+ }
+
+ public void setElasticsearchPort(Integer elasticsearchPort) {
+ this.elasticsearchPort = elasticsearchPort;
+ }
+
+ public Map<String, String> getIndexMap() {
+ return indexMap;
+ }
+
+ public void load(KeyValue props) {
+
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long"))
{
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") ||
cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") ||
cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") ||
cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConstant.java
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConstant.java
new file mode 100644
index 00000000..5a974a31
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConstant.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.config;
+
+public class ElasticsearchConstant {
+
+ public static final String ES_DOC = "doc";
+
+ public static final String ES_PARTITION = "ES_PARTITION";
+
+ public static final String ES_POSITION = "ES_POSITION";
+
+ public static final String ES_HOST = "elasticsearchHost";
+
+ public static final String ES_PORT = "elasticsearchPort";
+
+ public static final String INDEX = "index";
+
+ public static final String INCREMENT_FIELD = "incrementField";
+
+ public static final String INCREMENT = "increment";
+
+ public static final String PRIMARY_SHARDS = "primaryShards";
+
+ public static final String PRIMARY_SHARD = "primaryShard";
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnector.java
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnector.java
new file mode 100644
index 00000000..ad750d20
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+
+public class ElasticsearchSourceConnector extends SourceConnector {
+
+ private KeyValue keyValue;
+
+ private ElasticsearchConfig config;
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ this.config = new ElasticsearchConfig();
+ this.config.load(keyValue);
+ List<KeyValue> configs = new ArrayList<>();
+ JSONObject jsonObject = JSON.parseObject(this.config.getIndex());
+ for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
+ final String indexName = entry.getKey();
+ final JSONObject value = (JSONObject) entry.getValue();
+ Integer primaryShards =
value.getInteger(ElasticsearchConstant.PRIMARY_SHARDS);
+ primaryShards = primaryShards > maxTasks ? maxTasks :
primaryShards;
+ for (int i = 0; i < primaryShards; i++) {
+ this.keyValue.put(ElasticsearchConstant.INDEX, indexName);
+ this.keyValue.put(ElasticsearchConstant.INCREMENT_FIELD,
value.keySet().stream()
+ .filter(item ->
!ElasticsearchConstant.PRIMARY_SHARDS.equals(item)).collect(Collectors.toList()).get(0));
+ final String id =
value.getString(this.keyValue.getString(ElasticsearchConstant.INCREMENT_FIELD));
+ this.keyValue.put(ElasticsearchConstant.INCREMENT, id);
+ this.keyValue.put(ElasticsearchConstant.PRIMARY_SHARD, i + "");
+ configs.add(this.keyValue);
+ }
+ }
+ return configs;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return ElasticsearchSourceTask.class;
+ }
+
+ @Override
+ public void start(KeyValue config) {
+
+ for (String requestKey : ElasticsearchConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key: " +
requestKey);
+ }
+ }
+ this.keyValue = config;
+
+ }
+
+ @Override
+ public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceTask.java
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceTask.java
new file mode 100644
index 00000000..322e2e44
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceTask.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+import
org.apache.rocketmq.connect.elasticsearch.replicator.source.ElasticsearchReplicator;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchSourceTask extends SourceTask {
+
+ private static final Logger log =
LoggerFactory.getLogger(ElasticsearchSourceTask.class);
+
+ private ElasticsearchReplicator replicator;
+
+ private ElasticsearchConfig config;
+
+ @Override
+ public List<ConnectRecord> poll() {
+ List<ConnectRecord> res = new ArrayList<>();
+ try {
+ SearchHit searchHit = replicator.getQueue().poll(1000,
TimeUnit.MILLISECONDS);
+ if (searchHit != null) {
+ res.add(searchHit2ConnectRecord(searchHit));
+ }
+ } catch (Exception e) {
+ log.error("elasticsearch sourceTask poll error, current config:" +
JSON.toJSONString(config), e);
+ }
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ final RecordOffset recordOffset =
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition());
+ this.config = new ElasticsearchConfig();
+ this.config.load(keyValue);
+ this.replicator = new ElasticsearchReplicator(config);
+ this.replicator.start(recordOffset, keyValue);
+ log.info("elasticsearch source task start success");
+ }
+
+ @Override
+ public void stop() {
+ replicator.stop();
+ log.info("elasticsearch source task stop success");
+ }
+
+ public ConnectRecord searchHit2ConnectRecord(SearchHit hit) {
+ Schema schema = SchemaBuilder.struct().name(hit.getIndex()).build();
+ final List<Field> fields = buildFields(hit);
+ schema.setFields(fields);
+ final ConnectRecord connectRecord = new
ConnectRecord(buildRecordPartition(),
+ buildRecordOffset(hit),
+ System.currentTimeMillis(),
+ schema,
+ hit.getSourceAsString());
+ return connectRecord;
+ }
+
+ private List<Field> buildFields(SearchHit hit) {
+ List<Field> fields = new ArrayList<>();
+ final Map<String, Object> map = hit.getSourceAsMap();
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ fields.add(new Field(0, entry.getKey(),
getSchema(entry.getValue())));
+ }
+ return fields;
+ }
+
+ private RecordPartition buildRecordPartition() {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put(ElasticsearchConstant.ES_PARTITION,
ElasticsearchConstant.ES_PARTITION);
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
+ }
+
+ private RecordOffset buildRecordOffset(SearchHit hit) {
+ Map<String, Long> offsetMap = new HashMap<>();
+ Object value =
JSON.parseObject(hit.getSourceAsString()).get(config.getIndexMap().get(hit.getIndex()));
+ if (value == null) {
+ value = 1;
+ }
+ offsetMap.put(hit.getIndex() + ":" +
ElasticsearchConstant.ES_POSITION, Long.parseLong(value.toString()));
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ private Schema getSchema(Object obj) {
+ if (obj instanceof Integer) {
+ return SchemaBuilder.int32().build();
+ } else if (obj instanceof Long) {
+ return SchemaBuilder.int64().build();
+ } else if (obj instanceof String) {
+ return SchemaBuilder.string().build();
+ } else if (obj instanceof Date) {
+ return SchemaBuilder.time().build();
+ } else if (obj instanceof Timestamp) {
+ return SchemaBuilder.timestamp().build();
+ }
+ return SchemaBuilder.string().build();
+ }
+
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQuery.java
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQuery.java
new file mode 100644
index 00000000..718000a7
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQuery.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.replicator.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.http.HttpHost;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchQuery {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private ElasticsearchReplicator replicator;
+
+ private ElasticsearchConfig config;
+
+ private RestHighLevelClient client;
+
+ private RestClient restClient;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(5);
+
+ public ElasticsearchQuery(ElasticsearchReplicator replicator) {
+ this.replicator = replicator;
+ this.config = replicator.getConfig();
+ HttpHost httpHost = new HttpHost(config.getElasticsearchHost(),
config.getElasticsearchPort());
+ Node node = new Node(httpHost);
+ RestClientBuilder restClientBuilder = RestClient.builder(node);
+ restClient = restClientBuilder.build();
+ client = new RestHighLevelClient(restClientBuilder);
+ }
+
+ public void start(RecordOffset recordOffset, KeyValue keyValue) {
+ String scrollId = null;
+ while (true) {
+ if (scrollId == null) {
+ try {
+ final SearchResponse searchResponse =
this.searchData(recordOffset, keyValue);
+ final SearchHit[] hits =
searchResponse.getHits().getHits();
+ if (hits == null || hits.length < 1) {
+ break;
+ }
+ for (SearchHit hit : hits) {
+ replicator.getQueue().add(hit);
+ }
+ scrollId = searchResponse.getScrollId();
+
+ } catch (Exception e) {
+ logger.error("query Elasticsearch server failed", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ try {
+ final SearchResponse searchResponse =
scrollSearchData(scrollId);
+ final SearchHit[] hits = searchResponse.getHits().getHits();
+ if (hits == null || hits.length < 1) {
+ break;
+ }
+ for (SearchHit hit : hits) {
+ replicator.getQueue().add(hit);
+ }
+ scrollId = searchResponse.getScrollId();
+ } catch (Exception e) {
+ logger.error("scroll query Elasticsearch server occur error",
e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private SearchResponse searchData(RecordOffset recordOffset, KeyValue
keyValue) {
+ SearchRequest searchRequest = new SearchRequest();
+ searchRequest.indices(keyValue.getString(ElasticsearchConstant.INDEX));
+ final String shard =
keyValue.getString(ElasticsearchConstant.PRIMARY_SHARD);
+ if (shard != null) {
+ searchRequest.preference("_shards:" + shard);
+ }
+ searchRequest.scroll(TimeValue.timeValueMinutes(1));
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ RangeQueryBuilder rangeQueryBuilder =
QueryBuilders.rangeQuery(keyValue.getString(ElasticsearchConstant.INCREMENT_FIELD))
+ .gte(keyValue.getString(ElasticsearchConstant.INCREMENT));
+ if (recordOffset != null && recordOffset.getOffset() != null &&
recordOffset.getOffset().size() > 0) {
+ final Long offsetValue = (Long)
recordOffset.getOffset().get(keyValue.getString(ElasticsearchConstant.INDEX) +
ElasticsearchConstant.ES_POSITION);
+ rangeQueryBuilder = rangeQueryBuilder.gte(offsetValue);
+ }
+ searchSourceBuilder.query(rangeQueryBuilder);
+ searchSourceBuilder.from(0).size(200);
+ searchRequest.source(searchSourceBuilder);
+ final SearchResponse searchResponse;
+ try {
+ searchResponse = client.search(searchRequest,
RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return searchResponse;
+ }
+
+ private SearchResponse scrollSearchData(String scrollId) {
+ try {
+ SearchScrollRequest searchScrollRequest = new
SearchScrollRequest();
+ searchScrollRequest.scrollId(scrollId);
+ final SearchResponse searchResponse =
client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
+ return searchResponse;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void stop() {
+ try {
+ client.close();
+ executorService.shutdown();
+ } catch (IOException e) {
+ logger.error("close RestHighLevelClient occur error", e);
+ }
+ }
+
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchReplicator.java
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchReplicator.java
new file mode 100644
index 00000000..d9083d67
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchReplicator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.replicator.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchReplicator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchReplicator.class);
+
+ private ElasticsearchConfig config;
+
+ private ElasticsearchQuery query;
+
+ private BlockingQueue<SearchHit> queue = new LinkedBlockingQueue<>();
+
+ public ElasticsearchReplicator(ElasticsearchConfig config) {
+ this.config = config;
+ }
+
+ public void start(RecordOffset recordOffset, KeyValue keyValue) {
+ query = new ElasticsearchQuery(this);
+ query.start(recordOffset, keyValue);
+ LOGGER.info("ElasticsearchReplicator start succeed");
+ }
+
+ public void stop() {
+ query.stop();
+ }
+
+ public ElasticsearchConfig getConfig() {
+ return this.config;
+ }
+
+ public void commit(SearchHit data) {
+ queue.add(data);
+ }
+
+ public BlockingQueue<SearchHit> getQueue() {
+ return this.queue;
+ }
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfigTest.java
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfigTest.java
new file mode 100644
index 00000000..65df94bc
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.config;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchConfigTest {
+
+ private ElasticsearchConfig config;
+
+ @Before
+ public void before() {
+ config = new ElasticsearchConfig();
+ }
+
+ @Test
+ public void loadTest() {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ElasticsearchConstant.ES_HOST, "localhost");
+ keyValue.put(ElasticsearchConstant.ES_PORT, 9200);
+ keyValue.put(ElasticsearchConstant.INDEX, "index");
+ config.load(keyValue);
+ Assert.assertEquals("localhost", config.getElasticsearchHost());
+ Assert.assertTrue(9200 == config.getElasticsearchPort());
+ Assert.assertEquals("index", config.getIndex());
+
+ }
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnectorTest.java
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnectorTest.java
new file mode 100644
index 00000000..fcb45bd2
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnectorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchSourceConnectorTest {
+
+ private ElasticsearchSourceConnector sourceConnector;
+
+ @Before
+ public void before() {
+ sourceConnector = new ElasticsearchSourceConnector();
+ }
+
+ @Test
+ public void startTest() {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ElasticsearchConstant.ES_HOST, "localhost");
+ keyValue.put(ElasticsearchConstant.ES_PORT, 9200);
+ keyValue.put(ElasticsearchConstant.INDEX, "index");
+ Assertions.assertThatCode(() ->
sourceConnector.start(keyValue)).doesNotThrowAnyException();
+ }
+}
diff --git
a/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQueryTest.java
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQueryTest.java
new file mode 100644
index 00000000..a44fac0d
--- /dev/null
+++
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQueryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.replicator.source;
+
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchQueryTest {
+
+ private ElasticsearchQuery query;
+
+ private ElasticsearchReplicator replicator;
+
+ private ElasticsearchConfig config;
+
+ @Before
+ public void before() {
+ config = new ElasticsearchConfig();
+ config.setElasticsearchHost("localhost");
+ config.setElasticsearchPort(9200);
+
config.setIndex("{\"index_connect\":{\"id\":1},\"index_connect2\":{\"id\":2}}");
+ replicator = new ElasticsearchReplicator(config);
+ query = new ElasticsearchQuery(replicator);
+ }
+
+ @Test
+ public void startTest() {
+ Assertions.assertThatCode(() -> query.start(null, new
DefaultKeyValue())).doesNotThrowAnyException();
+ }
+}