This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 9ee4d8394 Add ElasticSearch catalog (#4108)
9ee4d8394 is described below
commit 9ee4d8394c06e05078b8a216e04bf67f9b3bc73c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Feb 13 09:40:40 2023 +0800
Add ElasticSearch catalog (#4108)
---
.../api/table/catalog/DataTypeConvertor.java | 8 +-
.../catalog/ElasticSearchCatalog.java | 158 +++++++++++++++++++++
.../catalog/ElasticSearchCatalogFactory.java | 44 ++++++
.../catalog/ElasticSearchDataTypeConvertor.java} | 33 ++---
.../elasticsearch/client/EsRestClient.java | 46 ++++++
.../exception/ElasticsearchConnectorErrorCode.java | 5 +-
.../jdbc/catalog/MysqlDataTypeConvertor.java | 12 +-
.../kafka/catalog/KafkaDataTypeConvertor.java | 9 +-
8 files changed, 280 insertions(+), 35 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
index 66d10e4d4..5ce924aff 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -30,20 +30,20 @@ public interface DataTypeConvertor<T> {
/**
* Transfer the data type from connector to SeaTunnel.
*
- * @param connectorType e.g. "int", "varchar(255)"
+ * @param connectorDataType e.g. "int", "varchar(255)"
* @return the data type of SeaTunnel
*/
- SeaTunnelDataType<?> toSeaTunnelType(String connectorType);
+ SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType);
/**
* Transfer the data type from connector to SeaTunnel.
*
- * @param connectorType origin data type
+ * @param connectorDataType origin data type
* @param dataTypeProperties origin data type properties, e.g. precision,
scale, length
* @return SeaTunnel data type
*/
// todo: If the origin data type contains the properties, we can remove
the dataTypeProperties.
- SeaTunnelDataType<?> toSeaTunnelType(T connectorType, Map<String, Object>
dataTypeProperties) throws DataTypeConvertException;
+ SeaTunnelDataType<?> toSeaTunnelType(T connectorDataType, Map<String,
Object> dataTypeProperties) throws DataTypeConvertException;
/**
* Transfer the data type from SeaTunnel to connector.
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
new file mode 100644
index 000000000..ca725453d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Elasticsearch catalog implementation.
+ * <p>In ElasticSearch, we use the index as the database and table.
+ */
+public class ElasticSearchCatalog implements Catalog {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticSearchCatalog.class);
+
+ private final String catalogName;
+ private final String defaultDatabase;
+ private final Config pluginConfig;
+
+ private EsRestClient esRestClient;
+
+ // todo: do we need default database?
+ public ElasticSearchCatalog(String catalogName, String defaultDatabase,
Config elasticSearchConfig) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be
null");
+ this.defaultDatabase = defaultDatabase;
+ this.pluginConfig = checkNotNull(elasticSearchConfig,
"elasticSearchConfig cannot be null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ try {
+ esRestClient = EsRestClient.createInstance(pluginConfig);
+ ElasticsearchClusterInfo elasticsearchClusterInfo =
esRestClient.getClusterInfo();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Success open es catalog: {}, cluster info: {}",
catalogName, elasticsearchClusterInfo);
+ }
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed to open catalog
%s", catalogName), e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ esRestClient.close();
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ // check if the index exist
+ try {
+ List<IndexDocsCount> indexDocsCount =
esRestClient.getIndexDocsCount(databaseName);
+ return true;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed to check if catalog %s database %s
exists", catalogName, databaseName), e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return esRestClient.listIndex();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+ return Lists.newArrayList(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath);
+ // todo: Check if the database name is the same with table name
+ return databaseExists(tablePath.getTableName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException {
+ // Get the index mapping?
+ return null;
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ // Create the index
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath);
+ if (!tableExists(tablePath) && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ try {
+ esRestClient.dropIndex(tablePath.getTableName());
+ } catch (Exception ex) {
+ throw new CatalogException(
+ String.format("Failed to drop table %s in catalog %s",
tablePath.getTableName(), catalogName), ex);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException("Elasticsearch does not
support create database");
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
+ dropTable(tablePath, ignoreIfNotExists);
+ }
+
+ @Override
+ public DataTypeConvertor<?> getDataTypeConvertor() {
+ return ElasticSearchDataTypeConvertor.getInstance();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
new file mode 100644
index 000000000..55e3460f9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+
+public class ElasticSearchCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // todo:
+ return null;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ // todo:
+ return "Elasticsearch";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ // todo:
+ return null;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
similarity index 67%
copy from
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
copy to
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index 70e14c9f9..1d8f4e51e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -15,38 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import java.util.Map;
-/**
- * The data type convertor of Kafka, only fields defined in schema has the
type.
- * e.g.
- * <pre>
- * schema = {
- * fields {
- * name = "string"
- * age = "int"
- * }
- * }
- * </pre>
- * <p> Right now the data type of kafka is SeaTunnelType, so we don't need to
convert the data type.
- */
-public class KafkaDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataType<?>> {
+public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataType<?>> {
+
+ private static final ElasticSearchDataTypeConvertor INSTANCE = new
ElasticSearchDataTypeConvertor();
- private static final KafkaDataTypeConvertor INSTANCE = new
KafkaDataTypeConvertor();
+ private ElasticSearchDataTypeConvertor() {
- private KafkaDataTypeConvertor() {
}
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
- // todo: Do we have utils to deserialize a string to SeaTunnelDataType?
- return null;
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType);
+ return SeaTunnelSchema.parseTypeByString(connectorDataType);
}
@Override
@@ -59,7 +50,7 @@ public class KafkaDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataTy
return seaTunnelDataType;
}
- public static KafkaDataTypeConvertor getInstance() {
+ public static ElasticSearchDataTypeConvertor getInstance() {
return INSTANCE;
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 2a4ea77a1..b32585503 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -59,6 +59,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
@Slf4j
public class EsRestClient {
@@ -345,6 +346,51 @@ public class EsRestClient {
}
}
+ public List<String> listIndex() {
+ String endpoint = "/_cat/indices?format=json";
+ Request request = new Request("GET", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED,
+ "GET " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ return JsonUtils.toList(entity, Map.class)
+ .stream()
+ .map(map -> map.get("index").toString())
+ .collect(Collectors.toList());
+ } else {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED,
+ String.format("GET %s response status code=%d", endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED,
ex);
+ }
+ }
+
+ public void dropIndex(String tableName) {
+ String endpoint = String.format("/%s", tableName);
+ Request request = new Request("DELETE", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED,
+ "DELETE " + endpoint + " response null");
+ }
+ // todo: if the index doesn't exist, the response status code is
200?
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ return;
+ } else {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED,
+ String.format("DELETE %s response status code=%d",
endpoint, response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED,
ex);
+ }
+ }
+
/**
* get es field name and type mapping realtion
*
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 2fb5c11c9..739d7d8b2 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -24,7 +24,10 @@ public enum ElasticsearchConnectorErrorCode implements
SeaTunnelErrorCode {
BULK_RESPONSE_ERROR("ELASTICSEARCH-01", "Bulk es response error"),
GET_ES_VERSION_FAILED("ELASTICSEARCH-02", "Get elasticsearch version
failed"),
SCROLL_REQUEST_ERROR("ELASTICSEARCH-03", "Fail to scroll request"),
- GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch
document index count failed");
+ GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch
document index count failed"),
+ LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
+ DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
+ ;
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
index bc1686a3a..ea0e3f7d1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
@@ -53,21 +53,21 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
public static final Integer DEFAULT_SCALE = 0;
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
- checkNotNull(connectorType, "connectorType can not be null");
- MysqlType mysqlType = MysqlType.getByName(connectorType);
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ MysqlType mysqlType = MysqlType.getByName(connectorDataType);
Map<String, Object> dataTypeProperties;
switch (mysqlType) {
case BIGINT_UNSIGNED:
case DECIMAL:
case DECIMAL_UNSIGNED:
// parse precision and scale
- int left = connectorType.indexOf("(");
- int right = connectorType.indexOf(")");
+ int left = connectorDataType.indexOf("(");
+ int right = connectorDataType.indexOf(")");
int precision = DEFAULT_PRECISION;
int scale = DEFAULT_SCALE;
if (left != -1 && right != -1) {
- String[] precisionAndScale = connectorType.substring(left
+ 1, right).split(",");
+ String[] precisionAndScale =
connectorDataType.substring(left + 1, right).split(",");
if (precisionAndScale.length == 2) {
precision = Integer.parseInt(precisionAndScale[0]);
scale = Integer.parseInt(precisionAndScale[1]);
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
index 70e14c9f9..b8e315a2d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import java.util.Map;
@@ -44,9 +47,9 @@ public class KafkaDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataTy
}
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
- // todo: Do we have utils to deserialize a string to SeaTunnelDataType?
- return null;
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ return SeaTunnelSchema.parseTypeByString(connectorDataType);
}
@Override