This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c4f955  [catalog] add array map json type for flink catalog (#291)
9c4f955 is described below

commit 9c4f9550081aa7e8e0b5b7e79d5bc78c357a9132
Author: wudi <[email protected]>
AuthorDate: Fri Jan 12 17:30:48 2024 +0800

    [catalog] add array map json type for flink catalog (#291)
---
 .../apache/doris/flink/catalog/DorisCatalog.java   |  1 -
 .../doris/flink/catalog/DorisTypeMapper.java       | 10 +++++
 .../doris/flink/catalog/doris/DorisType.java       |  3 ++
 .../apache/doris/flink/catalog/CatalogExample.java | 50 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 99ca0a4..96518d3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -283,7 +283,6 @@ public class DorisCatalog extends AbstractCatalog {
                 String columnType = resultSet.getString("DATA_TYPE");
                 long columnSize = resultSet.getLong("COLUMN_SIZE");
                 long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
-
                 DataType flinkType =
                         DorisTypeMapper.toFlinkType(
                                 columnName, columnType, (int) columnSize, 
(int) columnDigit);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index c388700..cc5fe4b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import org.apache.doris.flink.catalog.doris.DorisType;
 
+import static org.apache.doris.flink.catalog.doris.DorisType.ARRAY;
 import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN;
 import static org.apache.doris.flink.catalog.doris.DorisType.CHAR;
@@ -52,10 +53,13 @@ import static 
org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3;
 import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
 import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
 import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.JSON;
 import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
 import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.MAP;
 import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.STRING;
+import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT;
 import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
 
@@ -101,6 +105,12 @@ public class DorisTypeMapper {
             case LARGEINT:
             case STRING:
             case JSONB:
+            case JSON:
+                // Currently, the subtype of the generic cannot be obtained,
+                // so it is mapped to string
+            case ARRAY:
+            case MAP:
+            case STRUCT:
                 return DataTypes.STRING();
             case DATE:
             case DATE_V2:
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
index d242320..3779143 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -39,4 +39,7 @@ public class DorisType {
     public static final String BITMAP = "BITMAP";
     public static final String ARRAY = "ARRAY";
     public static final String JSONB = "JSONB";
+    public static final String JSON = "JSON";
+    public static final String MAP = "MAP";
+    public static final String STRUCT = "STRUCT";
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
new file mode 100644
index 0000000..1d7cf1d
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+public class CatalogExample {
+
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                "CREATE CATALOG doris_catalog WITH(\n"
+                        + "'type' = 'doris',\n"
+                        + "'default-database' = 'test',\n"
+                        + "'username' = 'root',\n"
+                        + "'password' = '',\n"
+                        + "'fenodes' = '1127.0.0.1:8030',\n"
+                        + "'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n"
+                        + "'sink.label-prefix' = 'label'\n"
+                        + ")");
+        // define a dynamic aggregating query
+        final Table result = tEnv.sqlQuery("SELECT * from 
doris_catalog.test.type_test");
+
+        // print the result to the console
+        tEnv.toRetractStream(result, Row.class).print();
+        env.execute();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to