This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 19ba51d4a9d609b520c4b18f7cab14024c74894f Author: thesumery <[email protected]> AuthorDate: Tue Nov 8 16:00:05 2022 +0800 [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424) Co-authored-by: thesumery <[email protected]> --- .../org/apache/inlong/sort/base/Constants.java | 7 + .../base/format/AbstractDynamicSchemaFormat.java | 7 - .../base/format/CanalJsonDynamicSchemaFormat.java | 25 +- .../format/DebeziumJsonDynamicSchemaFormat.java | 24 +- .../base/format/DynamicSchemaFormatFactory.java | 39 +- .../sort/base/format/JsonDynamicSchemaFormat.java | 50 ++- .../sort/base/format/JsonToRowDataConverters.java | 414 +++++++++++++++++++++ .../inlong/sort/base/sink/MultipleSinkOption.java | 24 +- .../format/CanalJsonDynamicSchemaFormatTest.java | 12 +- .../DebeziumJsonDynamicSchemaFormatTest.java | 3 +- ...eziumJsonDynamicSchemaFormatWithSchemaTest.java | 3 +- .../sort/doris/table/DorisDynamicTableFactory.java | 6 +- .../sort/iceberg/FlinkDynamicTableFactory.java | 2 + .../inlong/sort/iceberg/IcebergTableSink.java | 2 + .../sink/multiple/DynamicSchemaHandleOperator.java | 4 +- .../sort/kafka/table/KafkaDynamicTableFactory.java | 5 +- 16 files changed, 539 insertions(+), 88 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 19c58e9c1..9cc3c979d 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -163,4 +163,11 @@ public final class Constants { .booleanType() .defaultValue(true) .withDescription("Whether ignore the single table erros when multiple sink writing scenario."); + + public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK = + ConfigOptions.key("sink.multiple.typemap-compatible-with-spark") + .booleanType() + .defaultValue(false) + .withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and" + + "`time`, so type conversions must be mapped to types supported by spark."); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java index 2def7bb4b..f32b0086e 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java @@ -187,11 +187,4 @@ public abstract class AbstractDynamicSchemaFormat<T> { * @throws IOException The exception will throws */ public abstract String parse(T data, String pattern) throws IOException; - - /** - * Get the identifier of this dynamic schema format - * - * @return The identifier of this dynamic schema format - */ - public abstract String identifier(); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java index 88e51df7b..81256eda1 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java @@ -17,22 +17,22 @@ package org.apache.inlong.sort.base.format; -import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Canal json dynamic format */ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { - private static final String IDENTIFIER = "canal-json"; private static final String DDL_FLAG = "ddl"; private static final String DATA = "data"; private static final String OLD = "old"; @@ -43,15 +43,8 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { private static final String OP_UPDATE = "UPDATE"; private static final String OP_DELETE = "DELETE"; - private static final CanalJsonDynamicSchemaFormat FORMAT = new CanalJsonDynamicSchemaFormat(); - - private CanalJsonDynamicSchemaFormat() { - - } - - @SuppressWarnings("rawtypes") - public static AbstractDynamicSchemaFormat getInstance() { - return FORMAT; + protected CanalJsonDynamicSchemaFormat(Map<String, String> props) { + super(props); } @Override @@ -170,14 +163,4 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { return rowDataList; } - - /** - * Get the identifier of this dynamic schema format - * - * @return The identifier of this dynamic schema format - */ - @Override - public String identifier() { - return IDENTIFIER; - } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java index 9841d01f6..d535b87d1 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.base.format; -import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.table.data.RowData; @@ -33,6 +32,7 @@ import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter; import java.io.IOException; import java.util.ArrayList; @@ -44,7 +44,6 @@ import java.util.Map; */ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { - private static final String IDENTIFIER = "debezium-json"; private static final String DDL_FLAG = "ddl"; private static final String SCHEMA = "schema"; private static final String SQL_TYPE = "sqlType"; @@ -87,15 +86,8 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { .put("BYTES", new VarBinaryType()) .build(); - private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat(); - - private DebeziumJsonDynamicSchemaFormat() { - - } - - @SuppressWarnings("rawtypes") - public static AbstractDynamicSchemaFormat getInstance() { - return FORMAT; + protected DebeziumJsonDynamicSchemaFormat(Map<String, String> props) { + super(props); } @Override @@ -289,16 +281,6 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { return extractRowData(payload, rowType); } - /** - * Get the identifier of this dynamic schema format - * - * @return The identifier of this dynamic schema format - */ - @Override - public String identifier() { - return IDENTIFIER; - } - private LogicalType debeziumType2FlinkType(String debeziumType) { if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) { return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase()); diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java index 65a5b5e78..c260bdeaa 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java @@ -17,26 +17,36 @@ package org.apache.inlong.sort.base.format; +import com.google.common.collect.ImmutableMap; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; /** * Dynamic schema format factory */ public class DynamicSchemaFormatFactory { - public static final List<AbstractDynamicSchemaFormat<?>> SUPPORT_FORMATS = - new ArrayList<AbstractDynamicSchemaFormat<?>>() { + public static Map<String, Function<Map<String, String>, AbstractDynamicSchemaFormat>> SUPPORT_FORMATS = + ImmutableMap.of( + "canal-json", props -> new CanalJsonDynamicSchemaFormat(props), + "debezium-json", props -> new DebeziumJsonDynamicSchemaFormat(props) + ); - private static final long serialVersionUID = 1L; + /** + * Get format from the format name, it only supports [canal-json|debezium-json] for now + * + * @param identifier The identifier of this format + * @return The dynamic format + */ + @SuppressWarnings("rawtypes") + public static AbstractDynamicSchemaFormat getFormat(String identifier) { + return getFormat(identifier, new HashMap<>()); + } - { - add(CanalJsonDynamicSchemaFormat.getInstance()); - add(DebeziumJsonDynamicSchemaFormat.getInstance()); - } - }; /** * Get format from the format name, it only supports [canal-json|debezium-json] for now @@ -45,10 +55,11 @@ public class DynamicSchemaFormatFactory { * @return The dynamic format */ @SuppressWarnings("rawtypes") - public static AbstractDynamicSchemaFormat getFormat(String identifier) { + public static AbstractDynamicSchemaFormat getFormat(String identifier, Map<String, String> properties) { Preconditions.checkNotNull(identifier, "The identifier is null"); - return Preconditions.checkNotNull(SUPPORT_FORMATS.stream().filter(s -> s.identifier().equals(identifier)) - .findFirst().orElse(null), "Unsupport dynamic schema format for:" + identifier); + return Optional.ofNullable(SUPPORT_FORMATS.get(identifier)) + .orElseThrow(() -> + new UnsupportedOperationException("Unsupport dynamic schema format for:" + identifier)) + .apply(properties); } - } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java index a0d564d0c..e6fc75528 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java @@ -17,8 +17,9 @@ package org.apache.inlong.sort.base.format; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.formats.common.TimestampFormat; -import org.apache.flink.formats.json.JsonToRowDataConverters; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -51,6 +52,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.regex.Matcher; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; + /** * Json dynamic format class * This class main handle: @@ -64,6 +67,7 @@ import java.util.regex.Matcher; * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3') * the result of pared will be 'prefix_1_2_3_suffix' */ +@SuppressWarnings("LanguageDetectionInspection") public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> { /** @@ -71,7 +75,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma */ private static final Integer FIRST = 0; - private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING = + private static final Map<Integer, LogicalType> SQL_TYPE_2_FLINK_TYPE_MAPPING = ImmutableMap.<Integer, LogicalType>builder() .put(java.sql.Types.CHAR, new CharType()) .put(java.sql.Types.VARCHAR, new VarCharType()) @@ -94,12 +98,44 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma .put(java.sql.Types.BOOLEAN, new BooleanType()) .put(java.sql.Types.OTHER, new VarCharType()) .build(); + + private static final Map<Integer, LogicalType> SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING = + ImmutableMap.<Integer, LogicalType>builder() + .put(java.sql.Types.CHAR, new CharType()) + .put(java.sql.Types.VARCHAR, new VarCharType()) + .put(java.sql.Types.SMALLINT, new SmallIntType()) + .put(java.sql.Types.INTEGER, new IntType()) + .put(java.sql.Types.BIGINT, new BigIntType()) + .put(java.sql.Types.REAL, new FloatType()) + .put(java.sql.Types.DOUBLE, new DoubleType()) + .put(java.sql.Types.FLOAT, new FloatType()) + .put(java.sql.Types.DECIMAL, new DecimalType()) + .put(java.sql.Types.NUMERIC, new DecimalType()) + .put(java.sql.Types.BIT, new BooleanType()) + .put(java.sql.Types.TIME, new VarCharType()) + .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new LocalZonedTimestampType()) + .put(java.sql.Types.TIMESTAMP, new LocalZonedTimestampType()) + .put(java.sql.Types.BINARY, new BinaryType()) + .put(java.sql.Types.VARBINARY, new VarBinaryType()) + .put(java.sql.Types.BLOB, new VarBinaryType()) + .put(java.sql.Types.DATE, new DateType()) + .put(java.sql.Types.BOOLEAN, new BooleanType()) + .put(java.sql.Types.OTHER, new VarCharType()) + .build(); + public final ObjectMapper objectMapper = new ObjectMapper(); protected final JsonToRowDataConverters rowDataConverters; + protected final boolean adaptSparkEngine; - protected JsonDynamicSchemaFormat() { + public JsonDynamicSchemaFormat(Map<String, String> properties) { + ReadableConfig config = Configuration.fromMap(properties); + this.adaptSparkEngine = config.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK); this.rowDataConverters = - new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601); + new JsonToRowDataConverters( + false, + false, + TimestampFormat.ISO_8601, + adaptSparkEngine); } /** @@ -286,8 +322,10 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma } private LogicalType sqlType2FlinkType(int jdbcType) { - if (SQL_TYPE_2_ICEBERG_TYPE_MAPPING.containsKey(jdbcType)) { - return SQL_TYPE_2_ICEBERG_TYPE_MAPPING.get(jdbcType); + Map<Integer, LogicalType> typeMap = adaptSparkEngine + ? SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING : SQL_TYPE_2_FLINK_TYPE_MAPPING; + if (typeMap.containsKey(jdbcType)) { + return typeMap.get(jdbcType); } else { throw new IllegalArgumentException("Unsupported jdbcType: " + jdbcType); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java new file mode 100644 index 000000000..da6128942 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeParseException; +import java.time.temporal.TemporalAccessor; +import java.time.temporal.TemporalQueries; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT; + +/** Tool class used to convert from {@link JsonNode} to {@link RowData}. * */ +@Internal +public class JsonToRowDataConverters implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Flag indicating whether to fail if a field is missing. */ + private final boolean failOnMissingField; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + /** Timestamp format specification which is used to parse timestamp. */ + private final TimestampFormat timestampFormat; + + /** Wherther adapt spark sql program. */ + private final boolean adaptSpark; + + public JsonToRowDataConverters( + boolean failOnMissingField, + boolean ignoreParseErrors, + TimestampFormat timestampFormat, + boolean adaptSpark) { + this.failOnMissingField = failOnMissingField; + this.ignoreParseErrors = ignoreParseErrors; + this.timestampFormat = timestampFormat; + this.adaptSpark = adaptSpark; + } + + /** + * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL internal + * data structures. + */ + @FunctionalInterface + public interface JsonToRowDataConverter extends Serializable { + Object convert(JsonNode jsonNode); + } + + /** Creates a runtime converter which is null safe. */ + public JsonToRowDataConverter createConverter(LogicalType type) { + return wrapIntoNullableConverter(createNotNullConverter(type)); + } + + /** Creates a runtime converter which assuming input object is not null. */ + private JsonToRowDataConverter createNotNullConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return jsonNode -> null; + case BOOLEAN: + return this::convertToBoolean; + case TINYINT: + return jsonNode -> Byte.parseByte(jsonNode.asText().trim()); + case SMALLINT: + return jsonNode -> Short.parseShort(jsonNode.asText().trim()); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return this::convertToInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return this::convertToLong; + case DATE: + return this::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return this::convertToTime; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return this::convertToTimestamp; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (adaptSpark) { + return jsonNode -> { + try { + return convertToTimestampWithLocalZone(jsonNode); + } catch (DateTimeParseException e) { + return convertToTimestamp(jsonNode); + } + }; + } + return this::convertToTimestampWithLocalZone; + case FLOAT: + return this::convertToFloat; + case DOUBLE: + return this::convertToDouble; + case CHAR: + case VARCHAR: + return this::convertToString; + case BINARY: + case VARBINARY: + return this::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type); + case MAP: + MapType mapType = (MapType) type; + return createMapConverter( + mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType()); + case MULTISET: + MultisetType multisetType = (MultisetType) type; + return createMapConverter( + multisetType.asSummaryString(), + multisetType.getElementType(), + new IntType()); + case ROW: + return createRowConverter((RowType) type); + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private boolean convertToBoolean(JsonNode jsonNode) { + if (jsonNode.isBoolean()) { + // avoid redundant toString and parseBoolean, for better performance + return jsonNode.asBoolean(); + } else { + return Boolean.parseBoolean(jsonNode.asText().trim()); + } + } + + private int convertToInt(JsonNode jsonNode) { + if (jsonNode.canConvertToInt()) { + // avoid redundant toString and parseInt, for better performance + return jsonNode.asInt(); + } else { + return Integer.parseInt(jsonNode.asText().trim()); + } + } + + private long convertToLong(JsonNode jsonNode) { + if (jsonNode.canConvertToLong()) { + // avoid redundant toString and parseLong, for better performance + return jsonNode.asLong(); + } else { + return Long.parseLong(jsonNode.asText().trim()); + } + } + + private double convertToDouble(JsonNode jsonNode) { + if (jsonNode.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return jsonNode.asDouble(); + } else { + return Double.parseDouble(jsonNode.asText().trim()); + } + } + + private float convertToFloat(JsonNode jsonNode) { + if (jsonNode.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return (float) jsonNode.asDouble(); + } else { + return Float.parseFloat(jsonNode.asText().trim()); + } + } + + private int convertToDate(JsonNode jsonNode) { + LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate()); + return (int) date.toEpochDay(); + } + + private int convertToTime(JsonNode jsonNode) { + TemporalAccessor parsedTime = SQL_TIME_FORMAT.parse(jsonNode.asText()); + LocalTime localTime = parsedTime.query(TemporalQueries.localTime()); + + // get number of milliseconds of the day + return localTime.toSecondOfDay() * 1000; + } + + private TimestampData convertToTimestamp(JsonNode jsonNode) { + TemporalAccessor parsedTimestamp; + switch (timestampFormat) { + case SQL: + parsedTimestamp = SQL_TIMESTAMP_FORMAT.parse(jsonNode.asText()); + break; + case ISO_8601: + parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText()); + break; + default: + throw new TableException( + String.format( + "Unsupported timestamp format '%s'. Validator should have checked that.", + timestampFormat)); + } + LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); + + return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime)); + } + + private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode) { + TemporalAccessor parsedTimestampWithLocalZone; + switch (timestampFormat) { + case SQL: + parsedTimestampWithLocalZone = + SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText()); + break; + case ISO_8601: + parsedTimestampWithLocalZone = + ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText()); + break; + default: + throw new TableException( + String.format( + "Unsupported timestamp format '%s'. Validator should have checked that.", + timestampFormat)); + } + LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate()); + + return TimestampData.fromInstant( + LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC)); + } + + private StringData convertToString(JsonNode jsonNode) { + if (jsonNode.isContainerNode()) { + return StringData.fromString(jsonNode.toString()); + } else { + return StringData.fromString(jsonNode.asText()); + } + } + + private byte[] convertToBytes(JsonNode jsonNode) { + try { + return jsonNode.binaryValue(); + } catch (IOException e) { + throw new JsonParseException("Unable to deserialize byte array.", e); + } + } + + private JsonToRowDataConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return jsonNode -> { + BigDecimal bigDecimal; + if (jsonNode.isBigDecimal()) { + bigDecimal = jsonNode.decimalValue(); + } else { + bigDecimal = new BigDecimal(jsonNode.asText()); + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + }; + } + + private JsonToRowDataConverter createArrayConverter(ArrayType arrayType) { + JsonToRowDataConverter elementConverter = createConverter(arrayType.getElementType()); + final Class<?> elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + return jsonNode -> { + final ArrayNode node = (ArrayNode) jsonNode; + final Object[] array = (Object[]) Array.newInstance(elementClass, node.size()); + for (int i = 0; i < node.size(); i++) { + final JsonNode innerNode = node.get(i); + array[i] = elementConverter.convert(innerNode); + } + return new GenericArrayData(array); + }; + } + + private JsonToRowDataConverter createMapConverter( + String typeSummary, LogicalType keyType, LogicalType valueType) { + if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) { + throw new UnsupportedOperationException( + "JSON format doesn't support non-string as key type of map. " + + "The type is: " + + typeSummary); + } + final JsonToRowDataConverter keyConverter = createConverter(keyType); + final JsonToRowDataConverter valueConverter = createConverter(valueType); + + return jsonNode -> { + Iterator<Entry<String, JsonNode>> fields = jsonNode.fields(); + Map<Object, Object> result = new HashMap<>(); + while (fields.hasNext()) { + Map.Entry<String, JsonNode> entry = fields.next(); + Object key = keyConverter.convert(TextNode.valueOf(entry.getKey())); + Object value = valueConverter.convert(entry.getValue()); + result.put(key, value); + } + return new GenericMapData(result); + }; + } + + public JsonToRowDataConverter createRowConverter(RowType rowType) { + final JsonToRowDataConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(this::createConverter) + .toArray(JsonToRowDataConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + + return jsonNode -> { + ObjectNode node = (ObjectNode) jsonNode; + int arity = fieldNames.length; + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + JsonNode field = node.get(fieldName); + Object convertedField = convertField(fieldConverters[i], fieldName, field); + row.setField(i, convertedField); + } + return row; + }; + } + + private Object convertField( + JsonToRowDataConverter fieldConverter, String fieldName, JsonNode field) { + if (field == null) { + if (failOnMissingField) { + throw new JsonParseException("Could not find field with name '" + fieldName + "'."); + } else { + return null; + } + } else { + return fieldConverter.convert(field); + } + } + + private JsonToRowDataConverter wrapIntoNullableConverter( + JsonToRowDataConverter converter) { + return jsonNode -> { + if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) { + return null; + } + try { + return converter.convert(jsonNode); + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw t; + } + return null; + } + }; + } + + /** Exception which refers to parse errors in converters. */ + private static final class JsonParseException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public JsonParseException(String message) { + super(message); + } + + public JsonParseException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java index 77c924b95..10bc3f3d1 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java @@ -18,11 +18,14 @@ package org.apache.inlong.sort.base.sink; +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.Map; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE; import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE; import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.TRY_IT_BEST; @@ -37,6 +40,8 @@ public class MultipleSinkOption implements Serializable { private String format; + private boolean sparkEngineEnable; + private SchemaUpdateExceptionPolicy schemaUpdatePolicy; private String databasePattern; @@ -44,10 +49,12 @@ public class MultipleSinkOption implements Serializable { private String tablePattern; public MultipleSinkOption(String format, + boolean sparkEngineEnable, SchemaUpdateExceptionPolicy schemaUpdatePolicy, String databasePattern, String tablePattern) { this.format = format; + this.sparkEngineEnable = sparkEngineEnable; this.schemaUpdatePolicy = schemaUpdatePolicy; this.databasePattern = databasePattern; this.tablePattern = tablePattern; @@ -57,6 +64,15 @@ public class MultipleSinkOption implements Serializable { return format; } + public boolean isSparkEngineEnable() { + return sparkEngineEnable; + } + + public Map<String, String> getFormatOption() { + return ImmutableMap.of( + SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK.key(), String.valueOf(isSparkEngineEnable())); + } + public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() { return schemaUpdatePolicy; } @@ -75,6 +91,7 @@ public class MultipleSinkOption implements Serializable { public static class Builder { private String format; + private boolean sparkEngineEnable; private SchemaUpdateExceptionPolicy schemaUpdatePolicy; private String databasePattern; private String tablePattern; @@ -84,6 +101,11 @@ public class MultipleSinkOption implements Serializable { return this; } + public MultipleSinkOption.Builder withSparkEngineEnable(boolean sparkEngineEnable) { + this.sparkEngineEnable = sparkEngineEnable; + return this; + } + public MultipleSinkOption.Builder withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) { this.schemaUpdatePolicy = schemaUpdatePolicy; return this; @@ -100,7 +122,7 @@ public class MultipleSinkOption implements Serializable { } public MultipleSinkOption build() { - return new MultipleSinkOption(format, schemaUpdatePolicy, databasePattern, tablePattern); + return new MultipleSinkOption(format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern); } } diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java index dc7ce5ec8..29dbcb6a1 100644 --- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java @@ -37,6 +37,7 @@ import java.util.Map; * Test for {@link CanalJsonDynamicSchemaFormat} */ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> { + private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("canal-json"); @Override protected String getSource() { @@ -91,6 +92,11 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes return expectedValues; } + @Override + protected AbstractDynamicSchemaFormat<JsonNode> getDynamicSchemaFormat() { + return schemaFormat; + } + @Test @SuppressWarnings({"unchecked"}) public void testExtractPrimaryKey() throws IOException { @@ -119,10 +125,4 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes 5.18f)); Assert.assertEquals(values, rowDataList); } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() { - return CanalJsonDynamicSchemaFormat.getInstance(); - } } diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java index 5e555a657..c0eaface9 100644 --- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java @@ -32,6 +32,7 @@ import java.util.Map; * Test for {@link DebeziumJsonDynamicSchemaFormat} */ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> { + private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json"); @Override protected String getSource() { @@ -98,6 +99,6 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase @SuppressWarnings({"unchecked", "rawtypes"}) @Override protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() { - return DebeziumJsonDynamicSchemaFormat.getInstance(); + return schemaFormat; } } diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java index fb268b9a7..d4a3dd188 100644 --- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java @@ -36,6 +36,7 @@ import java.util.Map; * Test for {@link DebeziumJsonDynamicSchemaFormat} */ public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> { + private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json"); @Override protected String getSource() { @@ -270,6 +271,6 @@ public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchema @SuppressWarnings({"unchecked", "rawtypes"}) @Override protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() { - return DebeziumJsonDynamicSchemaFormat.getInstance(); + return schemaFormat; } } diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java index f3c263f93..30ef92b69 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java @@ -35,16 +35,13 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.utils.TableSchemaUtils; -import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import java.time.Duration; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; @@ -332,8 +329,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory + "is not allowed blank when the option 'sink.multiple.enable' is 'true'"); } DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); - List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map( - AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList()); + Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet(); if (!supportFormats.contains(sinkMultipleFormat)) { throw new ValidationException(String.format( "Unsupported value '%s' for '%s'. " diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index 4852bca33..040e73df6 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -55,6 +55,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG; /** @@ -241,6 +242,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami options.add(SINK_MULTIPLE_DATABASE_PATTERN); options.add(SINK_MULTIPLE_TABLE_PATTERN); options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); + options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK); return options; } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index ee7cf2c89..67ca5fdf2 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -49,6 +49,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG; /** @@ -104,6 +105,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, .multipleSink(tableOptions.get(SINK_MULTIPLE_ENABLE)) .multipleSinkOption(MultipleSinkOption.builder() .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT)) + .withSparkEngineEnable(tableOptions.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK)) .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN)) .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN)) .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY)) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index a0a9092c6..378f040d9 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -94,7 +94,9 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi this.catalog = catalogLoader.loadCatalog(); this.asNamespaceCatalog = catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null; - this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat()); + this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat( + multipleSinkOption.getFormat(), multipleSinkOption.getFormatOption()); + this.processingTimeService = getRuntimeContext().getProcessingTimeService(); processingTimeService.registerTimer( processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this); diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java index 480d55948..69716dfb6 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java @@ -51,7 +51,6 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.types.RowKind; -import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.kafka.KafkaDynamicSink; import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner; @@ -66,7 +65,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; -import java.util.stream.Collectors; import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT; @@ -440,8 +438,7 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna if (valueEncodingFormat instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(sinkMultipleFormat)) { DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); - List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map( - AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList()); + Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet(); if (!supportFormats.contains(sinkMultipleFormat)) { throw new ValidationException(String.format( "Unsupported value '%s' for '%s'. "
