This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d952cea43c [feature][doris] Doris factory type (#5061)
d952cea43c is described below
commit d952cea43cba309a8ca41b1180d87e4e5f816cde
Author: XiaoJiang521 <[email protected]>
AuthorDate: Sun Aug 6 22:26:23 2023 +0800
[feature][doris] Doris factory type (#5061)
* [feature][doris] Web need factory and data type convertor
---
.../connectors/doris/config/DorisSinkFactory.java | 48 +++++
.../doris/datatype/DorisDataTypeConvertor.java | 200 +++++++++++++++++++++
2 files changed, 248 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
new file mode 100644
index 0000000000..1e08da60a9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.config;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class DorisSinkFactory implements TableSinkFactory {
+
+ public static final String IDENTIFIER = "Doris";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ DorisConfig.FENODES,
+ DorisConfig.USERNAME,
+ DorisConfig.PASSWORD,
+ DorisConfig.SINK_LABEL_PREFIX,
+ DorisConfig.DORIS_SINK_CONFIG_PREFIX)
+ .optional(DorisConfig.SINK_ENABLE_2PC,
DorisConfig.SINK_ENABLE_DELETE)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
new file mode 100644
index 0000000000..7c9f08dfb7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.datatype;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.apache.commons.collections4.MapUtils;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class DorisDataTypeConvertor implements DataTypeConvertor<String> {
+
+ public static final String NULL = "NULL";
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DATE = "DATE";
+ public static final String DATETIME = "DATETIME";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String BINARY = "BINARY";
+ public static final String VARBINARY = "VARBINARY";
+ public static final String ARRAY = "ARRAY";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
+ public static final String UNION = "UNION";
+ public static final String INTERVAL = "INTERVAL";
+ public static final String TIMESTAMP = "TIMESTAMP";
+ public static final String YEAR = "YEAR";
+ public static final String GEOMETRY = "GEOMETRY";
+ public static final String IP = "IP";
+
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
+ public static final Integer DEFAULT_PRECISION = 10;
+
+ public static final Integer DEFAULT_SCALE = 0;
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ Map<String, Object> dataTypeProperties;
+ switch (connectorDataType.toUpperCase(Locale.ROOT)) {
+ case DECIMAL:
+ // parse precision and scale
+ int left = connectorDataType.indexOf("(");
+ int right = connectorDataType.indexOf(")");
+ int precision = DEFAULT_PRECISION;
+ int scale = DEFAULT_SCALE;
+ if (left != -1 && right != -1) {
+ String[] precisionAndScale =
+ connectorDataType.substring(left + 1,
right).split(",");
+ if (precisionAndScale.length == 2) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ scale = Integer.parseInt(precisionAndScale[1]);
+ } else if (precisionAndScale.length == 1) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ }
+ }
+ dataTypeProperties = ImmutableMap.of(PRECISION, precision,
SCALE, scale);
+ break;
+ default:
+ dataTypeProperties = Collections.emptyMap();
+ break;
+ }
+ return toSeaTunnelType(connectorDataType, dataTypeProperties);
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(
+ String connectorDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ checkNotNull(connectorDataType, "mysqlType can not be null");
+ int precision;
+ int scale;
+ switch (connectorDataType.toUpperCase(Locale.ROOT)) {
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case INT:
+ case YEAR:
+ return BasicType.INT_TYPE;
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case CHAR:
+ case VARCHAR:
+ return BasicType.STRING_TYPE;
+ case BINARY:
+ case VARBINARY:
+ case GEOMETRY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case DECIMAL:
+ precision = MapUtils.getInteger(dataTypeProperties, PRECISION,
DEFAULT_PRECISION);
+ scale = MapUtils.getInteger(dataTypeProperties, SCALE,
DEFAULT_SCALE);
+ return new DecimalType(precision, scale);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support DORIS type '%s''
yet.", connectorDataType));
+ }
+ }
+
+ @Override
+ public String toConnectorType(
+ SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ throws DataTypeConvertException {
+ checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ // todo: verify
+ switch (sqlType) {
+ case ARRAY:
+ return ARRAY;
+ case MAP:
+ case ROW:
+ case STRING:
+ case NULL:
+ return VARCHAR;
+ case BOOLEAN:
+ return BOOLEAN;
+ case TINYINT:
+ return TINYINT;
+ case SMALLINT:
+ return SMALLINT;
+ case INT:
+ return INT;
+ case BIGINT:
+ return BIGINT;
+ case FLOAT:
+ return FLOAT;
+ case DOUBLE:
+ return DOUBLE;
+ case DECIMAL:
+ return DECIMAL;
+ case BYTES:
+ return BINARY;
+ case DATE:
+ return DATE;
+ case TIME:
+ case TIMESTAMP:
+ return TIMESTAMP;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support Doris type '%s''
yet.", sqlType));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return "Doris";
+ }
+}