This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new c4a1f6f [feature]Support debezium table schema changes (#14) c4a1f6f is described below commit c4a1f6f24c67275a1ff2b97f440e4e815755dec9 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed May 8 16:47:16 2024 +0800 [feature]Support debezium table schema changes (#14) --- pom.xml | 8 +- .../doris/kafka/connector/cfg/DorisOptions.java | 11 +- .../connector/cfg/DorisSinkConnectorConfig.java | 1 + .../connector/converter/RecordDescriptor.java | 56 ++++- .../type/debezium/AbstractDebeziumTimeType.java | 2 +- .../kafka/connector/dialect/DialectProperties.java | 28 +++ .../doris/kafka/connector/dialect/DorisType.java | 47 ++++ .../kafka/connector/dialect/mysql/MysqlType.java | 213 +++++++++++++++++ .../connector/exception/SchemaChangeException.java | 46 ++++ .../kafka/connector/model/ColumnDescriptor.java | 91 ++++++++ .../kafka/connector/model/TableDescriptor.java | 100 ++++++++ .../doris/kafka/connector/model/doris/Field.java | 148 ++++++++++++ .../doris/kafka/connector/model/doris/Schema.java | 110 +++++++++ .../connector/service/DorisDefaultSinkService.java | 26 ++- .../connector/service/DorisSystemService.java | 98 ++++++++ .../doris/kafka/connector/service/RestService.java | 59 +++++ .../kafka/connector/utils/ConfigCheckUtils.java | 29 +++ .../kafka/connector/utils/HttpGetWithEntity.java | 37 +++ .../kafka/connector/writer/CopyIntoWriter.java | 7 + .../doris/kafka/connector/writer/DorisWriter.java | 7 +- .../kafka/connector/writer/StreamLoadWriter.java | 7 + .../writer/schema/DebeziumSchemaChange.java | 252 +++++++++++++++++++++ .../writer/schema/SchemaChangeHelper.java | 159 +++++++++++++ .../writer/schema/SchemaChangeManager.java | 142 ++++++++++++ .../connector/converter/TestRecordService.java | 4 +- .../connector/writer/TestDebeziumSchemaChange.java | 133 +++++++++++ 26 files changed, 1806 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index d062676..282650f 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ <metrics.version>4.2.25</metrics.version> <spotless.version>2.4.2</spotless.version> <debezium.version>1.9.8.Final</debezium.version> - <mockito.version>2.27.0</mockito.version> + <mockito.version>4.2.0</mockito.version> <junit.version>4.13.1</junit.version> <slf4j.version>1.7.25</slf4j.version> <mysql-connector.version>8.0.30</mysql-connector.version> @@ -181,6 +181,12 @@ <version>${mockito.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> <!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core --> <dependency> diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 6183869..e9f1297 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -44,7 +44,7 @@ public class DorisOptions { private final String password; private final String database; private final Map<String, String> topicMap; - + private final String schemaTopic; private final int fileSize; private final int recordNum; private long flushTime; @@ -105,6 +105,7 @@ public class DorisOptions { this.flushTime = DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN; } this.topicMap = getTopicToTableMap(config); + this.schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC); enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT; if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) { @@ -196,6 +197,10 @@ public class DorisOptions { return topicMap.get(topic); } + public Map<String, String> getTopicMap() { + return topicMap; + } + public String getQueryUrl() { List<String> queryUrls = getQueryUrls(); return queryUrls.get(0); @@ -288,6 +293,10 @@ public class DorisOptions { return enableDelete; } + public String getSchemaTopic() { + return schemaTopic; + } + /** * parse topic to table map * diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 58b6a62..94ea08e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -78,6 +78,7 @@ public class DorisSinkConnectorConfig { public static final String DELIVERY_GUARANTEE_DEFAULT = DeliveryGuarantee.AT_LEAST_ONCE.name(); public static final String CONVERT_MODE = "converter.mode"; public static final String CONVERT_MODE_DEFAULT = ConverterMode.NORMAL.getName(); + public static final String SCHEMA_TOPIC = "schema.topic"; // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java index 188f103..11d097f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java @@ -23,6 +23,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import org.apache.doris.kafka.connector.dialect.mysql.MysqlType; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -120,6 +121,8 @@ public class RecordDescriptor { private final String name; private final String schemaTypeName; private final String schemaName; + private String comment; + private String defaultValue; public FieldDescriptor( Schema schema, String name, String schemaTypeName, String schemaName) { @@ -129,6 +132,18 @@ public class RecordDescriptor { this.schemaName = schemaName; } + public FieldDescriptor( + Schema schema, + String name, + String schemaTypeName, + String schemaName, + String comment, + String defaultValue) { + this(schema, name, schemaTypeName, schemaName); + this.comment = comment; + this.defaultValue = defaultValue; + } + public String getName() { return name; } @@ -144,11 +159,20 @@ public class RecordDescriptor { public String getSchemaTypeName() { return schemaTypeName; } + + public String getComment() { + return comment; + } + + public String getDefaultValue() { + return defaultValue; + } } public static class Builder { private SinkRecord sinkRecord; + private Struct tableChange; // Internal build state private final List<String> keyFieldNames = new ArrayList<>(); @@ -160,11 +184,20 @@ public class RecordDescriptor { return this; } + public Builder withTableChange(Struct tableChange) { + this.tableChange = tableChange; + return this; + } + public RecordDescriptor build() { Objects.requireNonNull(sinkRecord, "The sink record must be provided."); final boolean flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord); - readSinkRecordNonKeyData(sinkRecord, flattened); + if (Objects.nonNull(tableChange)) { + readTableChangeData(tableChange); + } else { + readSinkRecordNonKeyData(sinkRecord, flattened); + } return new RecordDescriptor( sinkRecord, @@ -175,6 +208,27 @@ public class RecordDescriptor { flattened); } + private void readTableChangeData(final Struct tableChange) { + Struct tableChangeTable = tableChange.getStruct("table"); + List<Object> tableChangeColumns = tableChangeTable.getArray("columns"); + for (Object column : tableChangeColumns) { + Struct columnStruct = (Struct) column; + Schema schema = columnStruct.schema(); + String name = columnStruct.getString("name"); + String typeName = columnStruct.getString("typeName"); + Integer length = columnStruct.getInt32("length"); + Integer scale = columnStruct.getInt32("scale"); + String dorisType = MysqlType.toDorisType(typeName, length, scale); + String comment = columnStruct.getString("comment"); + String defaultValue = columnStruct.getString("defaultValueExpression"); + nonKeyFieldNames.add(name); + allFields.put( + name, + new FieldDescriptor( + schema, name, dorisType, schema.name(), comment, defaultValue)); + } + } + private boolean isFlattened(SinkRecord record) { return record.valueSchema().name() == null || !record.valueSchema().name().contains("Envelope"); diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java index 9cde668..c673249 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java @@ -32,7 +32,7 @@ public abstract class AbstractDebeziumTimeType extends AbstractTimeType { } if (sourceValue instanceof Number) { final LocalTime localTime = getLocalTime((Number) sourceValue); - return String.format("'%s'", DateTimeFormatter.ISO_TIME.format(localTime)); + return String.format("%s", DateTimeFormatter.ISO_TIME.format(localTime)); } throw new ConnectException( String.format( diff --git a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java b/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java new file mode 100644 index 0000000..62e4cab --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.dialect; + +public class DialectProperties { + + /* Max precision of datetime type of Doris. */ + public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6; + + public static final int TIMESTAMP_TYPE_MAX_PRECISION = 9; +} diff --git a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java b/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java new file mode 100644 index 0000000..89b416a --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.dialect; + +public class DorisType { + public static final String BOOLEAN = "BOOLEAN"; + public static final String TINYINT = "TINYINT"; + public static final String SMALLINT = "SMALLINT"; + public static final String INT = "INT"; + public static final String BIGINT = "BIGINT"; + public static final String LARGEINT = "LARGEINT"; + public static final String FLOAT = "FLOAT"; + public static final String DOUBLE = "DOUBLE"; + public static final String DECIMAL = "DECIMAL"; + public static final String DECIMAL_V3 = "DECIMALV3"; + public static final String DATE = "DATE"; + public static final String DATE_V2 = "DATEV2"; + public static final String DATETIME = "DATETIME"; + public static final String DATETIME_V2 = "DATETIMEV2"; + public static final String CHAR = "CHAR"; + public static final String VARCHAR = "VARCHAR"; + public static final String STRING = "STRING"; + public static final String HLL = "HLL"; + public static final String BITMAP = "BITMAP"; + public static final String ARRAY = "ARRAY"; + public static final String JSONB = "JSONB"; + public static final String JSON = "JSON"; + public static final String MAP = "MAP"; + public static final String STRUCT = "STRUCT"; +} diff --git a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java new file mode 100644 index 0000000..663aadf --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.dialect.mysql; + +import static org.apache.doris.kafka.connector.dialect.DialectProperties.MAX_SUPPORTED_DATE_TIME_PRECISION; +import static org.apache.doris.kafka.connector.dialect.DialectProperties.TIMESTAMP_TYPE_MAX_PRECISION; + +import com.google.common.base.Preconditions; +import org.apache.doris.kafka.connector.dialect.DorisType; + +public class MysqlType { + + // MySQL driver returns width of timestamp types instead of precision. + // 19 characters are used for zero-precision timestamps while others + // require 19 + precision + 1 characters with the additional character + // required for the decimal separator. + private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19; + private static final String BIT = "BIT"; + private static final String BOOLEAN = "BOOLEAN"; + private static final String BOOL = "BOOL"; + private static final String TINYINT = "TINYINT"; + private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL"; + private static final String SMALLINT = "SMALLINT"; + private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL"; + private static final String MEDIUMINT = "MEDIUMINT"; + private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL"; + private static final String INT = "INT"; + private static final String INT_UNSIGNED = "INT UNSIGNED"; + private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL"; + private static final String BIGINT = "BIGINT"; + private static final String SERIAL = "SERIAL"; + private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL"; + private static final String REAL = "REAL"; + private static final String REAL_UNSIGNED = "REAL UNSIGNED"; + private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL"; + private static final String FLOAT = "FLOAT"; + private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL"; + private static final String DOUBLE = "DOUBLE"; + private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL"; + private static final String DOUBLE_PRECISION = "DOUBLE PRECISION"; + private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED"; + private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL = + "DOUBLE PRECISION UNSIGNED ZEROFILL"; + private static final String NUMERIC = "NUMERIC"; + private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED"; + private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL"; + private static final String FIXED = "FIXED"; + private static final String FIXED_UNSIGNED = "FIXED UNSIGNED"; + private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL"; + private static final String DECIMAL = "DECIMAL"; + private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL"; + private static final String CHAR = "CHAR"; + private static final String VARCHAR = "VARCHAR"; + private static final String TINYTEXT = "TINYTEXT"; + private static final String MEDIUMTEXT = "MEDIUMTEXT"; + private static final String TEXT = "TEXT"; + private static final String LONGTEXT = "LONGTEXT"; + private static final String DATE = "DATE"; + private static final String TIME = "TIME"; + private static final String DATETIME = "DATETIME"; + private static final String TIMESTAMP = "TIMESTAMP"; + private static final String YEAR = "YEAR"; + private static final String BINARY = "BINARY"; + private static final String VARBINARY = "VARBINARY"; + private static final String TINYBLOB = "TINYBLOB"; + private static final String MEDIUMBLOB = "MEDIUMBLOB"; + private static final String BLOB = "BLOB"; + private static final String LONGBLOB = "LONGBLOB"; + private static final String JSON = "JSON"; + private static final String ENUM = "ENUM"; + private static final String SET = "SET"; + + public static String toDorisType(String type, Integer length, Integer scale) { + switch (type.toUpperCase()) { + case BIT: + case BOOLEAN: + case BOOL: + return DorisType.BOOLEAN; + case TINYINT: + return DorisType.TINYINT; + case TINYINT_UNSIGNED: + case TINYINT_UNSIGNED_ZEROFILL: + case SMALLINT: + return DorisType.SMALLINT; + case SMALLINT_UNSIGNED: + case SMALLINT_UNSIGNED_ZEROFILL: + case INT: + case MEDIUMINT: + case YEAR: + return DorisType.INT; + case INT_UNSIGNED: + case INT_UNSIGNED_ZEROFILL: + case MEDIUMINT_UNSIGNED: + case MEDIUMINT_UNSIGNED_ZEROFILL: + case BIGINT: + return DorisType.BIGINT; + case BIGINT_UNSIGNED: + case BIGINT_UNSIGNED_ZEROFILL: + return DorisType.LARGEINT; + case FLOAT: + case FLOAT_UNSIGNED: + case FLOAT_UNSIGNED_ZEROFILL: + return DorisType.FLOAT; + case REAL: + case REAL_UNSIGNED: + case REAL_UNSIGNED_ZEROFILL: + case DOUBLE: + case DOUBLE_UNSIGNED: + case DOUBLE_UNSIGNED_ZEROFILL: + case DOUBLE_PRECISION: + case DOUBLE_PRECISION_UNSIGNED: + case DOUBLE_PRECISION_UNSIGNED_ZEROFILL: + return DorisType.DOUBLE; + case NUMERIC: + case NUMERIC_UNSIGNED: + case NUMERIC_UNSIGNED_ZEROFILL: + case FIXED: + case FIXED_UNSIGNED: + case FIXED_UNSIGNED_ZEROFILL: + case DECIMAL: + case DECIMAL_UNSIGNED: + case DECIMAL_UNSIGNED_ZEROFILL: + return length != null && length <= 38 + ? String.format( + "%s(%s,%s)", + DorisType.DECIMAL_V3, + length, + scale != null && scale >= 0 ? scale : 0) + : DorisType.STRING; + case DATE: + return DorisType.DATE_V2; + case DATETIME: + case TIMESTAMP: + // default precision is 0 + // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html + if (length == null + || length <= 0 + || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) { + return String.format("%s(%s)", DorisType.DATETIME_V2, 0); + } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) { + // Timestamp with a fraction of seconds. + // For example, 2024-01-01 01:01:01.1 + // The decimal point will occupy 1 character. + // Thus,the length of the timestamp is 21. + return String.format( + "%s(%s)", + DorisType.DATETIME_V2, + Math.min( + length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1, + MAX_SUPPORTED_DATE_TIME_PRECISION)); + } else if (length <= TIMESTAMP_TYPE_MAX_PRECISION) { + // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9. + return String.format( + "%s(%s)", + DorisType.DATETIME_V2, + Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION)); + } else { + throw new UnsupportedOperationException( + "Unsupported length: " + + length + + " for MySQL TIMESTAMP/DATETIME types"); + } + case CHAR: + case VARCHAR: + Preconditions.checkNotNull(length); + return length * 3 > 65533 + ? DorisType.STRING + : String.format("%s(%s)", DorisType.VARCHAR, length * 3); + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case TIME: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + case BINARY: + case VARBINARY: + case SET: + return DorisType.STRING; + case JSON: + return DorisType.JSONB; + default: + throw new UnsupportedOperationException("Unsupported MySQL Type: " + type); + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java b/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java new file mode 100644 index 0000000..4c0ce46 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.exception; + +/** Doris Schema Change run exception. */ +public class SchemaChangeException extends RuntimeException { + public SchemaChangeException() { + super(); + } + + public SchemaChangeException(String message) { + super(message); + } + + public SchemaChangeException(String message, Throwable cause) { + super(message, cause); + } + + public SchemaChangeException(Throwable cause) { + super(cause); + } + + protected SchemaChangeException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java b/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java new file mode 100644 index 0000000..f34c9fa --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.model; + +import java.util.Objects; + +public class ColumnDescriptor { + private final String columnName; + private final String typeName; + private final String comment; + private final String defaultValue; + + private ColumnDescriptor( + String columnName, String typeName, String comment, String defaultValue) { + this.columnName = columnName; + this.typeName = typeName; + this.comment = comment; + this.defaultValue = defaultValue; + } + + public String getColumnName() { + return columnName; + } + + public String getTypeName() { + return typeName; + } + + public String getDefaultValue() { + return defaultValue; + } + + public String getComment() { + return comment; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String columnName; + private String typeName; + private String comment; + private String defaultValue; + + public Builder columnName(String columnName) { + this.columnName = columnName; + return this; + } + + public Builder typeName(String typeName) { + this.typeName = typeName; + return this; + } + + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + public Builder defaultValue(String defaultValue) { + this.defaultValue = defaultValue; + return this; + } + + public ColumnDescriptor build() { + Objects.requireNonNull(columnName, "A column name is required"); + Objects.requireNonNull(typeName, "A type name is required"); + + return new ColumnDescriptor(columnName, typeName, comment, defaultValue); + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java b/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java new file mode 100644 index 0000000..4cf14f4 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.model; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class TableDescriptor { + private final String tableName; + private final String tableType; + private final Map<String, ColumnDescriptor> columns = new LinkedHashMap<>(); + + private TableDescriptor(String tableName, String tableType, List<ColumnDescriptor> columns) { + this.tableName = tableName; + this.tableType = tableType; + columns.forEach(c -> this.columns.put(c.getColumnName(), c)); + } + + public static Builder builder() { + return new Builder(); + } + + public String getTableName() { + return tableName; + } + + public String getTableType() { + return tableType; + } + + public Collection<ColumnDescriptor> getColumns() { + return columns.values(); + } + + public ColumnDescriptor getColumnByName(String columnName) { + return columns.get(columnName); + } + + public boolean hasColumn(String columnName) { + return columns.containsKey(columnName); + } + + public static class Builder { + private String schemaName; + private String tableName; + private String tableType; + private final List<ColumnDescriptor> columns = new ArrayList<>(); + + private Builder() {} + + public Builder schemaName(String schemaName) { + this.schemaName = schemaName; + return this; + } + + public Builder tableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Builder type(String tableType) { + this.tableType = tableType; + return this; + } + + public Builder column(ColumnDescriptor column) { + this.columns.add(column); + return this; + } + + public Builder columns(List<ColumnDescriptor> columns) { + this.columns.addAll(columns); + return this; + } + + public TableDescriptor build() { + return new TableDescriptor(tableName, tableType, columns); + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java b/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java new file mode 100644 index 0000000..881a4a9 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.model.doris; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; + +public class Field { + @JsonProperty(value = "name") + private String name; + + @JsonProperty(value = "type") + private String type; + + @JsonProperty(value = "comment") + private String comment; + + @JsonProperty(value = "precision") + private int precision; + + @JsonProperty(value = "scale") + private int scale; + + @JsonProperty(value = "aggregation_type") + private String aggregationType; + + public Field() {} + + public Field( + String name, + String type, + String comment, + int precision, + int scale, + String aggregationType) { + this.name = name; + this.type = type; + this.comment = comment; + this.precision = precision; + this.scale = scale; + this.aggregationType = aggregationType; + } + + public String getAggregationType() { + return aggregationType; + } + + public void setAggregationType(String aggregationType) { + this.aggregationType = aggregationType; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public int getPrecision() { + return precision; + } + + public void setPrecision(int precision) { + this.precision = precision; + } + + public int getScale() { + return scale; + } + + public void setScale(int scale) { + this.scale = scale; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Field field = (Field) o; + return precision == field.precision + && scale == field.scale + && Objects.equals(name, field.name) + && Objects.equals(type, field.type) + && Objects.equals(comment, field.comment); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, comment, precision, scale); + } + + @Override + public String toString() { + return "Field{" + + "name='" + + name + + '\'' + + ", type='" + + type + + '\'' + + ", comment='" + + comment + + '\'' + + ", precision=" + + precision + + ", scale=" + + scale + + '}'; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java b/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java new file mode 100644 index 0000000..be29b3e --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.model.doris; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class Schema { + private int status = 0; + private String keysType; + private List<Field> properties; + + public Schema() { + properties = new ArrayList<>(); + } + + public Schema(int fieldCount) { + properties = new ArrayList<>(fieldCount); + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getKeysType() { + return keysType; + } + + public void setKeysType(String keysType) { + this.keysType = keysType; + } + + public List<Field> getProperties() { + return properties; + } + + public void setProperties(List<Field> properties) { + this.properties = properties; + } + + public void put( + String name, + String type, + String comment, + int scale, + int precision, + String aggregationType) { + properties.add(new Field(name, type, comment, scale, precision, aggregationType)); + } + + public void put(Field f) { + properties.add(f); + } + + public Field get(int index) { + if (index >= properties.size()) { + throw new IndexOutOfBoundsException( + "Index: " + index + ", Fields sizeļ¼" + properties.size()); + } + return properties.get(index); + } + + public int size() { + return properties.size(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Schema schema = (Schema) o; + return status == schema.status && Objects.equals(properties, schema.properties); + } + + @Override + public int hashCode() { + return Objects.hash(status, properties); + } + + @Override + public String toString() { + return "Schema{" + "status=" + status + ", properties=" + properties + '}'; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java index f25e4b9..29810e4 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java @@ -23,6 +23,7 @@ import com.codahale.metrics.MetricRegistry; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import org.apache.doris.kafka.connector.DorisSinkTask; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.connection.ConnectionProvider; @@ -34,6 +35,7 @@ import org.apache.doris.kafka.connector.writer.CopyIntoWriter; import org.apache.doris.kafka.connector.writer.DorisWriter; import org.apache.doris.kafka.connector.writer.StreamLoadWriter; import org.apache.doris.kafka.connector.writer.load.LoadModel; +import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -86,15 +88,23 @@ public class DorisDefaultSinkService implements DorisSinkService { if (writer.containsKey(nameIndex)) { LOG.info("already start task"); } else { - LoadModel loadModel = dorisOptions.getLoadModel(); + DorisWriter dorisWriter; String topic = topicPartition.topic(); int partition = topicPartition.partition(); - DorisWriter dorisWriter = - LoadModel.COPY_INTO.equals(loadModel) - ? new CopyIntoWriter( - topic, partition, dorisOptions, conn, connectMonitor) - : new StreamLoadWriter( - topic, partition, dorisOptions, conn, connectMonitor); + String schemaChangeTopic = dorisOptions.getSchemaTopic(); + if (Objects.nonNull(schemaChangeTopic) && schemaChangeTopic.equals(topic)) { + dorisWriter = + new DebeziumSchemaChange( + topic, partition, dorisOptions, conn, connectMonitor); + } else { + LoadModel loadModel = dorisOptions.getLoadModel(); + dorisWriter = + LoadModel.COPY_INTO.equals(loadModel) + ? new CopyIntoWriter( + topic, partition, dorisOptions, conn, connectMonitor) + : new StreamLoadWriter( + topic, partition, dorisOptions, conn, connectMonitor); + } writer.put(nameIndex, dorisWriter); metricsJmxReporter.start(); } @@ -119,7 +129,7 @@ public class DorisDefaultSinkService implements DorisSinkService { // check all sink writer to see if they need to be flushed for (DorisWriter writer : writer.values()) { // Time based flushing - if (writer.shouldFlush()) { + if (!(writer instanceof DebeziumSchemaChange) && writer.shouldFlush()) { writer.flushBuffer(); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java new file mode 100644 index 0000000..2365aa4 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.service; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; +import org.apache.commons.compress.utils.Lists; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DorisSystemService { + + private static final Logger LOG = LoggerFactory.getLogger(DorisSystemService.class); + private final JdbcConnectionProvider jdbcConnectionProvider; + + public DorisSystemService(DorisOptions dorisOptions) { + this.jdbcConnectionProvider = new JdbcConnectionProvider(dorisOptions); + } + + private static final List<String> builtinDatabases = + Collections.singletonList("information_schema"); + + public boolean tableExists(String database, String table) { + return databaseExists(database) && listTables(database).contains(table); + } + + public boolean databaseExists(String database) { + return listDatabases().contains(database); + } + + public List<String> listDatabases() { + return extractColumnValuesBySQL( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + public List<String> listTables(String databaseName) { + if (!databaseExists(databaseName)) { + throw new DorisException("database" + databaseName + " is not exists"); + } + return extractColumnValuesBySQL( + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", + 1, + null, + databaseName); + } + + public List<String> extractColumnValuesBySQL( + String sql, int columnIndex, Predicate<String> filterFunc, Object... params) { + + List<String> columnValues = Lists.newArrayList(); + try (PreparedStatement ps = + jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) { + if (Objects.nonNull(params) && params.length > 0) { + for (int i = 0; i < params.length; i++) { + ps.setObject(i + 1, params[i]); + } + } + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String columnValue = rs.getString(columnIndex); + if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + LOG.error("The following SQL query could not be executed: {}", sql, e); + throw new DorisException( + String.format("The following SQL query could not be executed: %s", sql), e); + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java index e6b5c33..e7e4a4f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java @@ -20,6 +20,7 @@ package org.apache.doris.kafka.connector.service; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; @@ -39,19 +40,29 @@ import org.apache.commons.io.IOUtils; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.exception.ConnectedFailedException; import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.exception.SchemaChangeException; import org.apache.doris.kafka.connector.model.BackendV2; import org.apache.doris.kafka.connector.model.LoadOperation; +import org.apache.doris.kafka.connector.model.doris.Schema; import org.apache.doris.kafka.connector.utils.BackoffAndRetryUtils; +import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.slf4j.Logger; public class RestService { private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; + private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); /** * get Doris BE nodes to request. @@ -285,4 +296,52 @@ public class RestService { logger.debug("Parsing schema result is '{}'.", backendRows); return backendRows; } + + /** Get table schema from doris. */ + public static Schema getSchema( + DorisOptions dorisOptions, String db, String table, Logger logger) { + logger.trace("start get " + db + "." + table + " schema from doris."); + Object responseData = null; + try { + String tableSchemaUri = + String.format(TABLE_SCHEMA_API, dorisOptions.getHttpUrl(), db, table); + HttpGet httpGet = new HttpGet(tableSchemaUri); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader(dorisOptions)); + Map<String, Object> responseMap = handleResponse(httpGet, logger); + responseData = responseMap.get("data"); + String schemaStr = OBJECT_MAPPER.writeValueAsString(responseData); + return OBJECT_MAPPER.readValue(schemaStr, Schema.class); + } catch (JsonProcessingException | IllegalArgumentException e) { + throw new SchemaChangeException("can not parse response schema " + responseData, e); + } + } + + private static String authHeader(DorisOptions dorisOptions) { + return "Basic " + + new String( + org.apache.commons.codec.binary.Base64.encodeBase64( + (dorisOptions.getUser() + ":" + dorisOptions.getPassword()) + .getBytes(StandardCharsets.UTF_8))); + } + + private static Map handleResponse(HttpUriRequest request, Logger logger) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + if (statusCode == 200 && response.getEntity() != null) { + String responseEntity = EntityUtils.toString(response.getEntity()); + return OBJECT_MAPPER.readValue(responseEntity, Map.class); + } else { + throw new SchemaChangeException( + "Failed to schemaChange, status: " + + statusCode + + ", reason: " + + reasonPhrase); + } + } catch (Exception e) { + logger.trace("SchemaChange request error,", e); + throw new SchemaChangeException("SchemaChange request error with " + e.getMessage()); + } + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index 2356f7d..84d3f90 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -19,9 +19,13 @@ package org.apache.doris.kafka.connector.utils; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.exception.ArgumentsException; import org.apache.doris.kafka.connector.exception.DorisException; @@ -73,6 +77,31 @@ public class ConfigCheckUtils { configIsValid = false; } + String schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC); + if (StringUtils.isNotEmpty(schemaTopic)) { + schemaTopic = schemaTopic.trim(); + if (!topics.isEmpty()) { + List<String> topicList = + Arrays.stream(topics.split(",")).collect(Collectors.toList()); + if (!topicList.contains(schemaTopic)) { + LOG.error( + "schema.topic is not included in topics list, please add! " + + " schema.topic={}, topics={}", + schemaTopic, + topics); + configIsValid = false; + } + } + if (!topicsRegex.isEmpty() && !topicsRegex.equals(schemaTopic)) { + LOG.error( + "topics.regex must equals schema.topic. please check again!" + + " topics.regex={}, schema.topic={}", + topicsRegex, + schemaTopic); + configIsValid = false; + } + } + if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP) && parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) == null) { diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java new file mode 100644 index 0000000..b36a1d5 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.utils; + +import java.net.URI; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; + +public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase { + private static final String METHOD_NAME = "GET"; + + @Override + public String getMethod() { + return METHOD_NAME; + } + + public HttpGetWithEntity(final String uri) { + super(); + setURI(URI.create(uri)); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java index a81520c..5018b27 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java @@ -33,6 +33,7 @@ import org.apache.doris.kafka.connector.exception.CopyLoadException; import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; import org.apache.doris.kafka.connector.utils.FileNameUtils; import org.apache.doris.kafka.connector.writer.load.CopyLoad; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +104,12 @@ public class CopyIntoWriter extends DorisWriter { return loadFileList; } + @Override + public void insert(SinkRecord record) { + initRecord(record); + insertRecord(record); + } + @Override public long getOffset() { if (fileNames.isEmpty()) { diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java index 29cf1ec..0ace5a6 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java @@ -58,6 +58,7 @@ public abstract class DorisWriter { protected RecordService recordService; protected int taskId; protected final DorisConnectMonitor connectMonitor; + protected boolean schemaChange; public DorisWriter( String topic, @@ -100,7 +101,9 @@ public abstract class DorisWriter { /** read offset from doris */ public abstract void fetchOffset(); - public void insert(final SinkRecord record) { + public void insert(final SinkRecord record) {} + + protected void initRecord(final SinkRecord record) { // init offset if (!hasInitialized && DeliveryGuarantee.EXACTLY_ONCE.equals(dorisOptions.getDeliveryGuarantee())) { @@ -113,7 +116,9 @@ public abstract class DorisWriter { fetchOffset(); this.hasInitialized = true; } + } + protected void insertRecord(final SinkRecord record) { // discard the record if the record offset is smaller or equal to server side offset if (record.kafkaOffset() > this.offsetPersistedInDoris.get() && record.kafkaOffset() > processedOffset.get()) { diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java index 0d76b32..a8fc00a 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java @@ -39,6 +39,7 @@ import org.apache.doris.kafka.connector.utils.FileNameUtils; import org.apache.doris.kafka.connector.writer.commit.DorisCommittable; import org.apache.doris.kafka.connector.writer.commit.DorisCommitter; import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +140,12 @@ public class StreamLoadWriter extends DorisWriter { return label2Status; } + @Override + public void insert(SinkRecord record) { + initRecord(record); + insertRecord(record); + } + @Override public long getOffset() { if (committableList.isEmpty()) { diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java new file mode 100644 index 0000000..fc0823b --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.writer.schema; + +import com.google.common.annotations.VisibleForTesting; +import io.debezium.data.Envelope; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.connection.ConnectionProvider; +import org.apache.doris.kafka.connector.converter.RecordDescriptor; +import org.apache.doris.kafka.connector.exception.SchemaChangeException; +import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; +import org.apache.doris.kafka.connector.model.ColumnDescriptor; +import org.apache.doris.kafka.connector.model.TableDescriptor; +import org.apache.doris.kafka.connector.model.doris.Schema; +import org.apache.doris.kafka.connector.service.DorisSystemService; +import org.apache.doris.kafka.connector.service.RestService; +import org.apache.doris.kafka.connector.writer.DorisWriter; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebeziumSchemaChange extends DorisWriter { + private static final Logger LOG = LoggerFactory.getLogger(DebeziumSchemaChange.class); + private final Map<String, String> topic2TableMap; + private SchemaChangeManager schemaChangeManager; + private DorisSystemService dorisSystemService; + private Set<String> sinkTableSet; + private List<String> ddlSqlList; + + public DebeziumSchemaChange( + String topic, + int partition, + DorisOptions dorisOptions, + ConnectionProvider connectionProvider, + DorisConnectMonitor connectMonitor) { + super(topic, partition, dorisOptions, connectionProvider, connectMonitor); + this.schemaChange = true; + this.sinkTableSet = new HashSet<>(); + this.dorisSystemService = new DorisSystemService(dorisOptions); + this.topic2TableMap = dorisOptions.getTopicMap(); + this.schemaChangeManager = new SchemaChangeManager(dorisOptions); + init(); + } + + @Override + public void fetchOffset() { + // do nothing + } + + private void init() { + Set<Map.Entry<String, String>> entrySet = topic2TableMap.entrySet(); + for (Map.Entry<String, String> entry : entrySet) { + sinkTableSet.add(entry.getValue()); + } + } + + @Override + public void insert(SinkRecord record) { + schemaChange(record); + } + + @Override + public void commit(int partition) { + // do nothing + } + + private void schemaChange(final SinkRecord record) { + String tableName = resolveTableName(record); + if (tableName == null) { + LOG.warn( + "Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset()); + processedOffset.set(record.kafkaOffset()); + return; + } + Struct recordStruct = (Struct) (record.value()); + List<Object> tableChanges = recordStruct.getArray("tableChanges"); + Struct tableChange = (Struct) tableChanges.get(0); + if ("DROP".equalsIgnoreCase(tableChange.getString("type")) + || "CREATE".equalsIgnoreCase(tableChange.getString("type"))) { + LOG.warn( + "CREATE and DROP {} tables are currently not supported. Please create or drop them manually.", + tableName); + processedOffset.set(record.kafkaOffset()); + return; + } + RecordDescriptor recordDescriptor = + RecordDescriptor.builder() + .withSinkRecord(record) + .withTableChange(tableChange) + .build(); + tableChange(tableName, recordDescriptor); + } + + private String resolveTableName(SinkRecord record) { + if (isTombstone(record)) { + LOG.warn( + "Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic '{}', partition '{}', offset '{}'", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset()); + return null; + } + Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE); + return source.getString("table"); + } + + private void alterTableIfNeeded(String tableName, RecordDescriptor record) { + LOG.debug("Attempting to alter table '{}'.", tableName); + if (!hasTable(tableName)) { + LOG.error("Table '{}' does not exist and cannot be altered.", tableName); + throw new SchemaChangeException("Could not find table: " + tableName); + } + final TableDescriptor dorisTableDescriptor = obtainTableSchema(tableName); + SchemaChangeHelper.compareSchema(dorisTableDescriptor, record.getFields()); + ddlSqlList = SchemaChangeHelper.generateDDLSql(dorisOptions.getDatabase(), tableName); + doSchemaChange(dorisOptions.getDatabase(), tableName); + } + + /** Obtain table schema from doris. */ + private TableDescriptor obtainTableSchema(String tableName) { + Schema schema = RestService.getSchema(dorisOptions, dbName, tableName, LOG); + List<ColumnDescriptor> columnDescriptors = new ArrayList<>(); + schema.getProperties() + .forEach( + column -> { + ColumnDescriptor columnDescriptor = + ColumnDescriptor.builder() + .columnName(column.getName()) + .typeName(column.getType()) + .comment(column.getComment()) + .build(); + columnDescriptors.add(columnDescriptor); + }); + return TableDescriptor.builder() + .tableName(tableName) + .type(schema.getKeysType()) + .columns(columnDescriptors) + .build(); + } + + private boolean hasTable(String tableName) { + return dorisSystemService.tableExists(dbName, tableName); + } + + private void tableChange(String tableName, RecordDescriptor recordDescriptor) { + if (!sinkTableSet.contains(tableName)) { + processedOffset.set(recordDescriptor.getOffset()); + LOG.warn( + "The " + + tableName + + " is not defined and requires synchronized data. If you need to synchronize the table data, please configure it in 'doris.topic2table.map'"); + return; + } + + if (!hasTable(tableName)) { + // TODO Table does not exist, automatically created it. + LOG.error("{} Table does not exist, please create manually.", tableName); + } else { + // Table exists, lets attempt to alter it if necessary. + alterTableIfNeeded(tableName, recordDescriptor); + } + processedOffset.set(recordDescriptor.getOffset()); + } + + private boolean doSchemaChange(String database, String tableName) { + boolean status = false; + if (ddlSqlList.isEmpty()) { + LOG.info("Schema change ddl is empty, not need do schema change."); + return false; + } + try { + List<SchemaChangeHelper.DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); + for (int i = 0; i < ddlSqlList.size(); i++) { + SchemaChangeHelper.DDLSchema ddlSchema = ddlSchemas.get(i); + String ddlSql = ddlSqlList.get(i); + boolean doSchemaChange = checkSchemaChange(database, tableName, ddlSchema); + status = + doSchemaChange + && schemaChangeManager.execute(ddlSql, dorisOptions.getDatabase()); + LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + } + } catch (Exception e) { + LOG.warn("schema change error :", e); + } + return status; + } + + private boolean checkSchemaChange( + String database, String table, SchemaChangeHelper.DDLSchema ddlSchema) + throws IllegalArgumentException, IOException { + Map<String, Object> param = + SchemaChangeManager.buildRequestParam( + ddlSchema.isDropColumn(), ddlSchema.getColumnName()); + return schemaChangeManager.checkSchemaChange(database, table, param); + } + + public long getOffset() { + committedOffset.set(processedOffset.get()); + return committedOffset.get(); + } + + private boolean isTombstone(SinkRecord record) { + return record.value() == null; + } + + @VisibleForTesting + public void setSinkTableSet(Set<String> sinkTableSet) { + this.sinkTableSet = sinkTableSet; + } + + @VisibleForTesting + public void setDorisSystemService(DorisSystemService dorisSystemService) { + this.dorisSystemService = dorisSystemService; + } + + @VisibleForTesting + public List<String> getDdlSqlList() { + return ddlSqlList; + } + + @VisibleForTesting + public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) { + this.schemaChangeManager = schemaChangeManager; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java new file mode 100644 index 0000000..abf424c --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.writer.schema; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.kafka.connector.converter.RecordDescriptor; +import org.apache.doris.kafka.connector.model.ColumnDescriptor; +import org.apache.doris.kafka.connector.model.TableDescriptor; + +public class SchemaChangeHelper { + private static final List<ColumnDescriptor> addColumnDescriptors = Lists.newArrayList(); + // Used to determine whether the column in the doris table can undergo schema change + private static final List<DDLSchema> ddlSchemas = Lists.newArrayList(); + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + + // TODO support drop column + // Dropping a column is a dangerous behavior and may result in an accidental deletion. + // There are some problems in the current implementation: each alter column operation will read + // the table structure + // in doris and compare the schema with the topic message. + // When there are more columns in the doris table than in the upstream table, + // these redundant columns in doris will be dropped, regardless of these redundant columns, is + // what you need. + // Therefore, the operation of dropping a column behavior currently requires the user to do it + // himself. + private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + + /** + * Compare kafka upstream table structure with doris table structure. If kafka field does not + * contain the structure of dorisTable, then need to add this field. + * + * @param dorisTable read from the table schema of doris. + * @param fields table structure from kafka upstream data source. + */ + public static void compareSchema( + TableDescriptor dorisTable, Map<String, RecordDescriptor.FieldDescriptor> fields) { + // Determine whether fields need to be added to doris table + addColumnDescriptors.clear(); + Collection<ColumnDescriptor> dorisTableColumns = dorisTable.getColumns(); + Set<String> dorisTableColumnNames = + dorisTableColumns.stream() + .map(ColumnDescriptor::getColumnName) + .collect(Collectors.toSet()); + Set<Map.Entry<String, RecordDescriptor.FieldDescriptor>> fieldsEntries = fields.entrySet(); + for (Map.Entry<String, RecordDescriptor.FieldDescriptor> fieldEntry : fieldsEntries) { + String fieldName = fieldEntry.getKey(); + if (!dorisTableColumnNames.contains(fieldName)) { + RecordDescriptor.FieldDescriptor fieldDescriptor = fieldEntry.getValue(); + ColumnDescriptor columnDescriptor = + new ColumnDescriptor.Builder() + .columnName(fieldDescriptor.getName()) + .typeName(fieldDescriptor.getSchemaTypeName()) + .defaultValue(fieldDescriptor.getDefaultValue()) + .comment(fieldDescriptor.getComment()) + .build(); + addColumnDescriptors.add(columnDescriptor); + } + } + } + + public static List<String> generateDDLSql(String database, String table) { + ddlSchemas.clear(); + List<String> ddlList = Lists.newArrayList(); + for (ColumnDescriptor columnDescriptor : addColumnDescriptors) { + ddlList.add(buildAddColumnDDL(database, table, columnDescriptor)); + ddlSchemas.add(new DDLSchema(columnDescriptor.getColumnName(), false)); + } + return ddlList; + } + + public static List<DDLSchema> getDdlSchemas() { + return ddlSchemas; + } + + private static String buildDropColumnDDL(String database, String tableName, String columName) { + return String.format( + DROP_DDL, + identifier(database) + "." + identifier(tableName), + identifier(columName)); + } + + private static String buildAddColumnDDL( + String database, String tableName, ColumnDescriptor columnDescriptor) { + String columnName = columnDescriptor.getColumnName(); + String columnType = columnDescriptor.getTypeName(); + String defaultValue = columnDescriptor.getDefaultValue(); + String comment = columnDescriptor.getComment(); + String addDDL = + String.format( + ADD_DDL, + identifier(database) + "." + identifier(tableName), + identifier(columnName), + columnType); + if (defaultValue != null) { + addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue); + } + if (StringUtils.isNotEmpty(comment)) { + addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'"; + } + return addDDL; + } + + private static String identifier(String name) { + return "`" + name + "`"; + } + + private static String quoteDefaultValue(String defaultValue) { + // DEFAULT current_timestamp not need quote + if (defaultValue.equalsIgnoreCase("current_timestamp")) { + return defaultValue; + } + return "'" + defaultValue + "'"; + } + + private static String quoteComment(String comment) { + return comment.replaceAll("'", "\\\\'"); + } + + public static class DDLSchema { + private final String columnName; + private final boolean isDropColumn; + + public DDLSchema(String columnName, boolean isDropColumn) { + this.columnName = columnName; + this.isDropColumn = isDropColumn; + } + + public String getColumnName() { + return columnName; + } + + public boolean isDropColumn() { + return isDropColumn; + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java new file mode 100644 index 0000000..1ee9c1e --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.writer.schema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.exception.SchemaChangeException; +import org.apache.doris.kafka.connector.utils.HttpGetWithEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SchemaChangeManager implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); + private static final String CHECK_SCHEMA_CHANGE_API = + "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final DorisOptions dorisOptions; + + public SchemaChangeManager(DorisOptions dorisOptions) { + this.dorisOptions = dorisOptions; + } + + public static Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) { + Map<String, Object> params = new HashMap<>(); + params.put("isDropColumn", dropColumn); + params.put("columnName", columnName); + return params; + } + + /** check ddl can do light schema change. */ + public boolean checkSchemaChange(String database, String table, Map<String, Object> params) + throws IllegalArgumentException, IOException { + if (params.isEmpty()) { + return false; + } + String requestUrl = + String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getHttpUrl(), database, table); + HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); + String responseEntity = ""; + Map<String, Object> responseMap = handleResponse(httpGet, responseEntity); + return handleSchemaChange(responseMap, responseEntity); + } + + private boolean handleSchemaChange(Map<String, Object> responseMap, String responseEntity) { + String code = responseMap.getOrDefault("code", "-1").toString(); + if (code.equals("0")) { + return true; + } else { + throw new SchemaChangeException("Failed to schemaChange, response: " + responseEntity); + } + } + + /** execute sql in doris. */ + public boolean execute(String ddl, String database) + throws IOException, IllegalArgumentException { + if (StringUtils.isEmpty(ddl)) { + return false; + } + LOG.info("Execute SQL: {}", ddl); + HttpPost httpPost = buildHttpPost(ddl, database); + String responseEntity = ""; + Map<String, Object> responseMap = handleResponse(httpPost, responseEntity); + return handleSchemaChange(responseMap, responseEntity); + } + + public HttpPost buildHttpPost(String ddl, String database) + throws IllegalArgumentException, IOException { + Map<String, String> param = new HashMap<>(); + param.put("stmt", ddl); + String requestUrl = String.format(SCHEMA_CHANGE_API, dorisOptions.getHttpUrl(), database); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + return httpPost; + } + + private Map<String, Object> handleResponse(HttpUriRequest request, String responseEntity) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + if (statusCode == 200 && response.getEntity() != null) { + responseEntity = EntityUtils.toString(response.getEntity()); + return objectMapper.readValue(responseEntity, Map.class); + } else { + throw new SchemaChangeException( + "Failed to schemaChange, status: " + + statusCode + + ", reason: " + + reasonPhrase); + } + } catch (Exception e) { + LOG.error("SchemaChange request error,", e); + throw new SchemaChangeException("SchemaChange request error with " + e.getMessage()); + } + } + + private String authHeader() { + return "Basic " + + new String( + Base64.encodeBase64( + (dorisOptions.getUser() + ":" + dorisOptions.getPassword()) + .getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index f1970bd..3cff4fa 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -75,14 +75,14 @@ public class TestRecordService { String noDeleteValue = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i [...] String expectedNoDeleteValue = - "{\"id\":8,\"name\":\"Jfohn Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"'10:30:00'\",\"text_column\":\"Lorem ipsum dolor sit amet, consectetur adipiscing elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_D [...] + "{\"id\":8,\"name\":\"Jfohn Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"10:30:00\",\"text_column\":\"Lorem ipsum dolor sit amet, consectetur adipiscing elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_DEL [...] buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue); // delete value String deleteValue = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i [...] String expectedDeleteValue = - "{\"id\":8,\"name\":\"Jfohn Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"'10:30:00'\",\"text_column\":\"Lorem ipsum dolor sit amet, consectetur adipiscing elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_D [...] + "{\"id\":8,\"name\":\"Jfohn Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"10:30:00\",\"text_column\":\"Lorem ipsum dolor sit amet, consectetur adipiscing elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_DEL [...] buildProcessStructRecord(topic, deleteValue, expectedDeleteValue); } diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java new file mode 100644 index 0000000..c95af44 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.writer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; +import org.apache.doris.kafka.connector.model.doris.Schema; +import org.apache.doris.kafka.connector.service.DorisSystemService; +import org.apache.doris.kafka.connector.service.RestService; +import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange; +import org.apache.doris.kafka.connector.writer.schema.SchemaChangeManager; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public class TestDebeziumSchemaChange { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JsonConverter jsonConverter = new JsonConverter(); + private final HashSet<String> sinkTableSet = new HashSet<>(); + private DebeziumSchemaChange debeziumSchemaChange; + private DorisOptions dorisOptions; + private String topic; + private MockedStatic<RestService> mockRestService; + + @Before + public void init() throws IOException { + InputStream stream = + this.getClass() + .getClassLoader() + .getResourceAsStream("doris-connector-sink.properties"); + Properties props = new Properties(); + props.load(stream); + props.put("task_id", "1"); + props.put("name", "sink-connector-test"); + topic = "normal"; + dorisOptions = new DorisOptions((Map) props); + DorisConnectMonitor dorisConnectMonitor = mock(DorisConnectMonitor.class); + DorisSystemService mockDorisSystemService = mock(DorisSystemService.class); + jsonConverter.configure(new HashMap<>(), false); + mockRestService = mockStatic(RestService.class); + SchemaChangeManager mockSchemaChangeManager = Mockito.mock(SchemaChangeManager.class); + Mockito.when( + mockSchemaChangeManager.checkSchemaChange( + Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(true); + debeziumSchemaChange = + new DebeziumSchemaChange( + topic, + 0, + dorisOptions, + new JdbcConnectionProvider(dorisOptions), + dorisConnectMonitor); + when(mockDorisSystemService.tableExists(anyString(), anyString())).thenReturn(true); + + sinkTableSet.add("normal_time"); + debeziumSchemaChange.setSchemaChangeManager(mockSchemaChangeManager); + debeziumSchemaChange.setSinkTableSet(sinkTableSet); + debeziumSchemaChange.setDorisSystemService(mockDorisSystemService); + } + + @Test + public void testAlterSchemaChange() { + String alterTopicMsg = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"}, [...] + SchemaAndValue schemaAndValue = + jsonConverter.toConnectData(topic, alterTopicMsg.getBytes(StandardCharsets.UTF_8)); + SinkRecord sinkRecord = + TestRecordBuffer.newSinkRecord(schemaAndValue.value(), 8, schemaAndValue.schema()); + String normalTimeSchemaStr = + "{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"timestamp_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"datetime_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"date_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV2\"}],\"status\":200}"; + Schema normalTimeSchema = null; + try { + normalTimeSchema = objectMapper.readValue(normalTimeSchemaStr, Schema.class); + } catch (JsonProcessingException e) { + throw new DorisException(e); + } + mockRestService + .when(() -> RestService.getSchema(any(), any(), any(), any())) + .thenReturn(normalTimeSchema); + + debeziumSchemaChange.insert(sinkRecord); + List<String> ddlSqlList = debeziumSchemaChange.getDdlSqlList(); + Assert.assertEquals( + ddlSqlList.get(0), + "ALTER TABLE `test_db`.`normal_time` ADD COLUMN `time_test` STRING DEFAULT '12:00'"); + } + + @After + public void close() { + mockRestService.close(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org