This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d952cea43c [feature][doris] Doris factory type (#5061)
d952cea43c is described below

commit d952cea43cba309a8ca41b1180d87e4e5f816cde
Author: XiaoJiang521 <[email protected]>
AuthorDate: Sun Aug 6 22:26:23 2023 +0800

    [feature][doris] Doris factory type (#5061)
    
    * [feature][doris] Web need factory and data type convertor
---
 .../connectors/doris/config/DorisSinkFactory.java  |  48 +++++
 .../doris/datatype/DorisDataTypeConvertor.java     | 200 +++++++++++++++++++++
 2 files changed, 248 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
new file mode 100644
index 0000000000..1e08da60a9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.config;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class DorisSinkFactory implements TableSinkFactory {
+
+    public static final String IDENTIFIER = "Doris";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        DorisConfig.FENODES,
+                        DorisConfig.USERNAME,
+                        DorisConfig.PASSWORD,
+                        DorisConfig.SINK_LABEL_PREFIX,
+                        DorisConfig.DORIS_SINK_CONFIG_PREFIX)
+                .optional(DorisConfig.SINK_ENABLE_2PC, 
DorisConfig.SINK_ENABLE_DELETE)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
new file mode 100644
index 0000000000..7c9f08dfb7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.datatype;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.apache.commons.collections4.MapUtils;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class DorisDataTypeConvertor implements DataTypeConvertor<String> {
+
+    public static final String NULL = "NULL";
+    public static final String BOOLEAN = "BOOLEAN";
+    public static final String TINYINT = "TINYINT";
+    public static final String SMALLINT = "SMALLINT";
+    public static final String INT = "INT";
+    public static final String BIGINT = "BIGINT";
+    public static final String FLOAT = "FLOAT";
+    public static final String DOUBLE = "DOUBLE";
+    public static final String DECIMAL = "DECIMAL";
+    public static final String DATE = "DATE";
+    public static final String DATETIME = "DATETIME";
+    public static final String CHAR = "CHAR";
+    public static final String VARCHAR = "VARCHAR";
+    public static final String BINARY = "BINARY";
+    public static final String VARBINARY = "VARBINARY";
+    public static final String ARRAY = "ARRAY";
+    public static final String MAP = "MAP";
+    public static final String STRUCT = "STRUCT";
+    public static final String UNION = "UNION";
+    public static final String INTERVAL = "INTERVAL";
+    public static final String TIMESTAMP = "TIMESTAMP";
+    public static final String YEAR = "YEAR";
+    public static final String GEOMETRY = "GEOMETRY";
+    public static final String IP = "IP";
+
+    public static final String PRECISION = "precision";
+    public static final String SCALE = "scale";
+
+    public static final Integer DEFAULT_PRECISION = 10;
+
+    public static final Integer DEFAULT_SCALE = 0;
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        Map<String, Object> dataTypeProperties;
+        switch (connectorDataType.toUpperCase(Locale.ROOT)) {
+            case DECIMAL:
+                // parse precision and scale
+                int left = connectorDataType.indexOf("(");
+                int right = connectorDataType.indexOf(")");
+                int precision = DEFAULT_PRECISION;
+                int scale = DEFAULT_SCALE;
+                if (left != -1 && right != -1) {
+                    String[] precisionAndScale =
+                            connectorDataType.substring(left + 1, 
right).split(",");
+                    if (precisionAndScale.length == 2) {
+                        precision = Integer.parseInt(precisionAndScale[0]);
+                        scale = Integer.parseInt(precisionAndScale[1]);
+                    } else if (precisionAndScale.length == 1) {
+                        precision = Integer.parseInt(precisionAndScale[0]);
+                    }
+                }
+                dataTypeProperties = ImmutableMap.of(PRECISION, precision, 
SCALE, scale);
+                break;
+            default:
+                dataTypeProperties = Collections.emptyMap();
+                break;
+        }
+        return toSeaTunnelType(connectorDataType, dataTypeProperties);
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            String connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        checkNotNull(connectorDataType, "mysqlType can not be null");
+        int precision;
+        int scale;
+        switch (connectorDataType.toUpperCase(Locale.ROOT)) {
+            case NULL:
+                return BasicType.VOID_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case TINYINT:
+                return BasicType.BYTE_TYPE;
+            case SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case INT:
+            case YEAR:
+                return BasicType.INT_TYPE;
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case CHAR:
+            case VARCHAR:
+                return BasicType.STRING_TYPE;
+            case BINARY:
+            case VARBINARY:
+            case GEOMETRY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case DECIMAL:
+                precision = MapUtils.getInteger(dataTypeProperties, PRECISION, 
DEFAULT_PRECISION);
+                scale = MapUtils.getInteger(dataTypeProperties, SCALE, 
DEFAULT_SCALE);
+                return new DecimalType(precision, scale);
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Doesn't support DORIS type '%s''  
yet.", connectorDataType));
+        }
+    }
+
+    @Override
+    public String toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            throws DataTypeConvertException {
+        checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        // todo: verify
+        switch (sqlType) {
+            case ARRAY:
+                return ARRAY;
+            case MAP:
+            case ROW:
+            case STRING:
+            case NULL:
+                return VARCHAR;
+            case BOOLEAN:
+                return BOOLEAN;
+            case TINYINT:
+                return TINYINT;
+            case SMALLINT:
+                return SMALLINT;
+            case INT:
+                return INT;
+            case BIGINT:
+                return BIGINT;
+            case FLOAT:
+                return FLOAT;
+            case DOUBLE:
+                return DOUBLE;
+            case DECIMAL:
+                return DECIMAL;
+            case BYTES:
+                return BINARY;
+            case DATE:
+                return DATE;
+            case TIME:
+            case TIMESTAMP:
+                return TIMESTAMP;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Doesn't support Doris type '%s''  
yet.", sqlType));
+        }
+    }
+
+    @Override
+    public String getIdentity() {
+        return "Doris";
+    }
+}

Reply via email to