This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5834e0f7a [INLONG-7049][Manager] Support complex data types in Hudi
Sink (#7050)
5834e0f7a is described below
commit 5834e0f7abeb96c92126a1122f28ced4db287ef0
Author: feat <[email protected]>
AuthorDate: Mon Dec 26 20:46:19 2022 +0800
[INLONG-7049][Manager] Support complex data types in Hudi Sink (#7050)
---
.../inlong/manager/pojo/sink/hudi/HudiType.java | 34 +++++++------
.../resource/sink/hudi/HudiCatalogClient.java | 19 ++++----
.../resource/sink/hudi/HudiTypeConverter.java | 55 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 24 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
index 672b07f8f..392ca7794 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
@@ -24,26 +24,30 @@ import lombok.Getter;
*/
public enum HudiType {
- BOOLEAN("boolean"),
- INT("int"),
- LONG("long"),
- FLOAT("float"),
- DOUBLE("double"),
- DECIMAL("decimal"),
- DATE("date"),
- TIME("time"),
- TIMESTAMP("timestamp"),
- TIMESTAMPTZ("timestamptz"),
- STRING("string"),
- UUID("uuid"),
- FIXED("fixed"),
- BINARY("binary");
+ BOOLEAN("boolean", "boolean"),
+ INT("int", "int"),
+ LONG("long", "bigint"),
+ FLOAT("float", "float"),
+ DOUBLE("double", "double"),
+ DATE("date", "date"),
+ TIME("time", "time(0)"),
+ TIMESTAMP("timestamp", "timestamp(3)"),
+ TIMESTAMPT_Z("timestamptz", "timestamp(6)"),
+ STRING("string", "varchar(" + Integer.MAX_VALUE + ")"),
+ BINARY("binary", "tinyint"),
+ UUID("uuid", "uuid"),
+ FIXED("fixed", null),
+ DECIMAL("decimal", null);
@Getter
private final String type;
- HudiType(String type) {
+ @Getter
+ private final String hiveType;
+
+ HudiType(String type, String hiveType) {
this.type = type;
+ this.hiveType = hiveType;
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 124dcaedb..8055114ad 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.service.resource.sink.hudi;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -191,14 +190,16 @@ public class HudiCatalogClient {
String location = this.warehouse + "/" + dbName + ".db" + "/" +
tableName;
properties.put("path", location);
- List<FieldSchema> cols = new ArrayList<>();
- for (HudiColumnInfo column : tableInfo.getColumns()) {
- FieldSchema fieldSchema = new FieldSchema();
- fieldSchema.setName(column.getName());
- fieldSchema.setType(column.getType());
- fieldSchema.setComment(column.getDesc());
- cols.add(fieldSchema);
- }
+ List<FieldSchema> cols = tableInfo.getColumns()
+ .stream()
+ .map(column -> {
+ FieldSchema fieldSchema = new FieldSchema();
+ fieldSchema.setName(column.getName());
+ fieldSchema.setType(HudiTypeConverter.convert(column));
+ fieldSchema.setComment(column.getDesc());
+ return fieldSchema;
+ })
+ .collect(Collectors.toList());
// Build storage of hudi table
StorageDescriptor sd = new StorageDescriptor();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java
new file mode 100644
index 000000000..ebe24a6e7
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.hudi;
+
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiType;
+
+import java.util.Optional;
+
+/**
+ * Converter between Java type and Hive type that reflects the behavior before
This converter reflects the old behavior
+ * that includes:
+ * <ul>
+ * <li>Use old java.sql.* time classes for time data types.
+ * <li>Only support millisecond precision for timestamps or day-time intervals.
+ * <li>Let variable precision and scale for decimal types pass through the
planner.
+ * </ul>
+ * {@see org.apache.flink.table.types.utils.TypeInfoDataTypeConverter}
+ */
+public class HudiTypeConverter {
+
+ /**
+ * Converter field type of column to Hive field type.
+ */
+ public static String convert(HudiColumnInfo column) {
+ return Optional.ofNullable(column)
+ .map(col -> HudiType.forType(col.getType()))
+ .map(hudiType -> {
+ if (HudiType.DECIMAL == hudiType) {
+ return String.format("decimal(%d, %d)",
column.getPrecision(), column.getScale());
+ } else if (HudiType.FIXED == hudiType) {
+ return String.format("fixed(%d)", column.getLength());
+ } else {
+ return hudiType.getHiveType();
+ }
+ })
+ .orElseThrow(() -> new RuntimeException("Can not properly
convert type of column: " + column));
+ }
+
+}