This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 35771549 [improve] support modify column type/comment, support auto
create database if not exists (#408)
35771549 is described below
commit 35771549180bed4981689071de000fd6e2b9e080
Author: North Lin <[email protected]>
AuthorDate: Wed Jun 26 14:19:51 2024 +0800
[improve] support modify column type/comment, support auto create database
if not exists (#408)
---
.../flink/sink/schema/SchemaChangeHelper.java | 67 ++++++++++--
.../flink/sink/schema/SchemaChangeManager.java | 75 ++++++++++++-
.../flink/sink/schema/SchemaManagerITCase.java | 121 +++++++++++++++++----
3 files changed, 230 insertions(+), 33 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 4c29c348..8d365ffc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -29,6 +29,8 @@ import java.util.Map;
import java.util.Map.Entry;
public class SchemaChangeHelper {
+ public static final String DEFAULT_DATABASE = "information_schema";
+
private static final List<String> dropFieldSchemas = Lists.newArrayList();
private static final List<FieldSchema> addFieldSchemas =
Lists.newArrayList();
// Used to determine whether the doris table supports ddl
@@ -38,6 +40,11 @@ public class SchemaChangeHelper {
private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s
%s";
private static final String CHECK_COLUMN_EXISTS =
"SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE
TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'";
+ private static final String CHECK_DATABASE_EXISTS =
+ "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE
SCHEMA_NAME = '%s'";
+ private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT
EXISTS %s";
+ private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY
COLUMN %s %s";
+ private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY
COLUMN %s COMMENT '%s'";
public static void compareSchema(
Map<String, FieldSchema> updateFiledSchemaMap,
@@ -104,19 +111,18 @@ public class SchemaChangeHelper {
String type = fieldSchema.getTypeString();
String defaultValue = fieldSchema.getDefaultValue();
String comment = fieldSchema.getComment();
- String addDDL =
- String.format(
- ADD_DDL,
- DorisSystem.quoteTableIdentifier(tableIdentifier),
- DorisSystem.identifier(name),
- type);
+ StringBuilder addDDL =
+ new StringBuilder(
+ String.format(
+ ADD_DDL,
+
DorisSystem.quoteTableIdentifier(tableIdentifier),
+ DorisSystem.identifier(name),
+ type));
if (defaultValue != null) {
- addDDL = addDDL + " DEFAULT " +
DorisSystem.quoteDefaultValue(defaultValue);
- }
- if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
- addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment)
+ "'";
+ addDDL.append(" DEFAULT
").append(DorisSystem.quoteDefaultValue(defaultValue));
}
- return addDDL;
+ commentColumn(addDDL, comment);
+ return addDDL.toString();
}
public static String buildDropColumnDDL(String tableIdentifier, String
columName) {
@@ -139,6 +145,45 @@ public class SchemaChangeHelper {
return String.format(CHECK_COLUMN_EXISTS, database, table, column);
}
+ public static String buildDatabaseExistsQuery(String database) {
+ return String.format(CHECK_DATABASE_EXISTS, database);
+ }
+
+ public static String buildCreateDatabaseDDL(String database) {
+ return String.format(CREATE_DATABASE_DDL, database);
+ }
+
+ public static String buildModifyColumnCommentDDL(
+ String tableIdentifier, String columnName, String newComment) {
+ return String.format(
+ MODIFY_COMMENT_DDL,
+ DorisSystem.quoteTableIdentifier(tableIdentifier),
+ DorisSystem.identifier(columnName),
+ DorisSystem.quoteComment(newComment));
+ }
+
+ public static String buildModifyColumnDataTypeDDL(
+ String tableIdentifier, FieldSchema fieldSchema) {
+ String columnName = fieldSchema.getName();
+ String dataType = fieldSchema.getTypeString();
+ String comment = fieldSchema.getComment();
+ StringBuilder modifyDDL =
+ new StringBuilder(
+ String.format(
+ MODIFY_TYPE_DDL,
+
DorisSystem.quoteTableIdentifier(tableIdentifier),
+ DorisSystem.identifier(columnName),
+ dataType));
+ commentColumn(modifyDDL, comment);
+ return modifyDDL.toString();
+ }
+
+ private static void commentColumn(StringBuilder ddl, String comment) {
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ ddl.append(" COMMENT
'").append(DorisSystem.quoteComment(comment)).append("'");
+ }
+ }
+
public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index d2bacf26..27f2aece 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -30,6 +30,8 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.DorisSchemaChangeException;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -47,6 +49,7 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class SchemaChangeManager implements Serializable {
private static final long serialVersionUID = 1L;
@@ -68,6 +71,12 @@ public class SchemaChangeManager implements Serializable {
}
public boolean createTable(TableSchema table) throws IOException,
IllegalArgumentException {
+ // auto create database if not exists
+ if (!checkDatabaseExists(table.getDatabase())) {
+ execute(
+
SchemaChangeHelper.buildCreateDatabaseDDL(table.getDatabase()),
+ SchemaChangeHelper.DEFAULT_DATABASE);
+ }
String createTableDDL = DorisSystem.buildCreateTableDDL(table);
return execute(createTableDDL, table.getDatabase());
}
@@ -109,6 +118,54 @@ public class SchemaChangeManager implements Serializable {
database, table, buildRequestParam(true, oldColumnName),
renameColumnDDL);
}
+ public boolean modifyColumnDataType(String database, String table,
FieldSchema field)
+ throws IOException, IllegalArgumentException {
+ if (!checkColumnExists(database, table, field.getName())) {
+ LOG.warn(
+ "The column {} is not exists in table {}, can not modify
it type",
+ field.getName(),
+ table);
+ return false;
+ }
+ // If user does not give a comment, need fill it from
+ // original table schema to avoid miss comment
+ if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) {
+ Schema tableSchema = getTableSchema(database, table);
+ Optional<Field> originalField =
+ tableSchema.getProperties().stream()
+ .filter(column ->
column.getName().equals(field.getName()))
+ .findAny();
+ originalField.ifPresent(oldField ->
field.setComment(oldField.getComment()));
+ }
+ String tableIdentifier = getTableIdentifier(database, table);
+ String modifyColumnDDL =
+
SchemaChangeHelper.buildModifyColumnDataTypeDDL(tableIdentifier, field);
+ return schemaChange(
+ database, table, buildRequestParam(false, field.getName()),
modifyColumnDDL);
+ }
+
+ public boolean modifyColumnComment(
+ String database, String table, String columnName, String
newComment)
+ throws IOException, IllegalArgumentException {
+ if (!checkColumnExists(database, table, columnName)) {
+ LOG.warn(
+ "The column {} is not exists in table {}, can not modify
it's comment",
+ columnName,
+ table);
+ return false;
+ }
+ String tableIdentifier = getTableIdentifier(database, table);
+ String modifyColumnCommentDDL =
+ SchemaChangeHelper.buildModifyColumnCommentDDL(
+ tableIdentifier, columnName, newComment);
+ return schemaChange(
+ database, table, buildRequestParam(false, columnName),
modifyColumnCommentDDL);
+ }
+
+ public Schema getTableSchema(String database, String table) {
+ return RestService.getSchema(dorisOptions, database, table, LOG);
+ }
+
public boolean schemaChange(
String database, String table, Map<String, Object> params, String
sql)
throws IOException, IllegalArgumentException {
@@ -215,7 +272,18 @@ public class SchemaChangeManager implements Serializable {
public boolean checkColumnExists(String database, String table, String
columnName)
throws IllegalArgumentException, IOException {
String existsQuery =
SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName);
- HttpPost httpPost = buildHttpPost(existsQuery, database);
+ return sendHttpPostRequest(existsQuery, database);
+ }
+
+ private boolean checkDatabaseExists(String database)
+ throws IllegalArgumentException, IOException {
+ String existsQuery =
SchemaChangeHelper.buildDatabaseExistsQuery(database);
+ return sendHttpPostRequest(existsQuery,
SchemaChangeHelper.DEFAULT_DATABASE);
+ }
+
+ private boolean sendHttpPostRequest(String sql, String database)
+ throws IOException, IllegalArgumentException {
+ HttpPost httpPost = buildHttpPost(sql, database);
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(httpPost);
final int statusCode = response.getStatusLine().getStatusCode();
@@ -231,7 +299,10 @@ public class SchemaChangeManager implements Serializable {
}
}
} catch (Exception e) {
- LOG.error("check column exist request error {}, default return
false", e.getMessage());
+ LOG.error(
+ "send http post request error {}, default return false,
SQL:{}",
+ e.getMessage(),
+ sql);
}
return false;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
index 8d2a9b0d..3dde08d5 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
@@ -18,9 +18,13 @@
package org.apache.doris.flink.sink.schema;
import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -28,11 +32,13 @@ import org.junit.Test;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
public class SchemaManagerITCase extends DorisTestBase {
@@ -87,7 +93,7 @@ public class SchemaManagerITCase extends DorisTestBase {
@Test
public void testAddColumnWithChineseComment()
- throws SQLException, IOException, IllegalArgumentException {
+ throws SQLException, IOException, IllegalArgumentException,
InterruptedException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
@@ -105,7 +111,7 @@ public class SchemaManagerITCase extends DorisTestBase {
private void addColumnWithChineseCommentAndAssert(
String tableName, String addColumnName, String chineseComment,
boolean assertFlag)
- throws SQLException, IOException, IllegalArgumentException {
+ throws IOException, IllegalArgumentException, InterruptedException
{
FieldSchema field = new FieldSchema(addColumnName, "string",
chineseComment);
schemaChangeManager.addColumn(DATABASE, tableName, field);
boolean exists = schemaChangeManager.addColumn(DATABASE, tableName,
field);
@@ -115,28 +121,31 @@ public class SchemaManagerITCase extends DorisTestBase {
Assert.assertTrue(exists);
// check Chinese comment
- Map<String, String> columnComments = getColumnComments(tableName);
+ Thread.sleep(3_000);
+ String comment = getColumnComment(tableName, addColumnName);
if (assertFlag) {
- Assert.assertEquals(columnComments.get(addColumnName),
chineseComment);
+ Assert.assertEquals(comment, chineseComment);
} else {
- Assert.assertNotEquals(columnComments.get(addColumnName),
chineseComment);
+ Assert.assertNotEquals(comment, chineseComment);
}
}
- private Map<String, String> getColumnComments(String table) throws
SQLException {
- Map<String, String> columnCommentsMap = new HashMap<>();
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD)) {
- ResultSet columns = connection.getMetaData().getColumns(null,
DATABASE, table, null);
-
- while (columns.next()) {
- String columnName = columns.getString("COLUMN_NAME");
- String comment = columns.getString("REMARKS");
- columnCommentsMap.put(columnName, comment);
- }
- }
- return columnCommentsMap;
+ private String getColumnComment(String table, String columnName) {
+ Schema schema = schemaChangeManager.getTableSchema(DATABASE, table);
+ Optional<Field> first =
+ schema.getProperties().stream()
+ .filter(col -> col.getName().equals(columnName))
+ .findFirst();
+ return first.map(Field::getComment).orElse(null);
+ }
+
+ private String getColumnType(String table, String columnName) {
+ Schema schema = schemaChangeManager.getTableSchema(DATABASE, table);
+ Optional<Field> first =
+ schema.getProperties().stream()
+ .filter(col -> col.getName().equals(columnName))
+ .findFirst();
+ return first.map(Field::getType).orElse(null);
}
@Test
@@ -162,4 +171,76 @@ public class SchemaManagerITCase extends DorisTestBase {
exists = schemaChangeManager.checkColumnExists(DATABASE,
renameColumnTbls, "age");
Assert.assertFalse(exists);
}
+
+ @Test
+ public void testModifyColumnComment()
+ throws SQLException, IOException, IllegalArgumentException {
+ String modifyColumnCommentTbls = "modify_column_comment";
+ initDorisSchemaChangeTable(modifyColumnCommentTbls);
+ String columnName = "age";
+ String newComment = "new comment of age";
+ schemaChangeManager.modifyColumnComment(
+ DATABASE, modifyColumnCommentTbls, columnName, newComment);
+
+ String comment = getColumnComment(modifyColumnCommentTbls, columnName);
+ Assert.assertEquals(newComment, comment);
+ }
+
+ @Test
+ public void testOnlyModifyColumnType()
+ throws SQLException, IOException, IllegalArgumentException,
InterruptedException {
+ String modifyColumnTbls = "modify_column_type";
+ String columnName = "age";
+ String newColumnType = "bigint";
+ initDorisSchemaChangeTable(modifyColumnTbls);
+ FieldSchema field = new FieldSchema(columnName, newColumnType, "");
+ schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls,
field);
+
+ Thread.sleep(3_000);
+ String columnType = getColumnType(modifyColumnTbls, columnName);
+ Assert.assertEquals(newColumnType, columnType.toLowerCase());
+ }
+
+ @Test
+ public void testModifyColumnTypeAndComment()
+ throws SQLException, IOException, IllegalArgumentException,
InterruptedException {
+ String modifyColumnTbls = "modify_column_type_and_comment";
+ initDorisSchemaChangeTable(modifyColumnTbls);
+ String columnName = "age";
+ String newColumnType = "bigint";
+ String newComment = "new comment of age";
+ FieldSchema field = new FieldSchema(columnName, newColumnType,
newComment);
+ schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls,
field);
+
+ Thread.sleep(3_000);
+ String comment = getColumnComment(modifyColumnTbls, columnName);
+ Assert.assertEquals(newComment, comment);
+
+ String columnType = getColumnType(modifyColumnTbls, columnName);
+ Assert.assertEquals(newColumnType, columnType.toLowerCase());
+ }
+
+ @Test
+ public void testCreateTableWhenDatabaseNotExists()
+ throws IOException, IllegalArgumentException, InterruptedException
{
+ String databaseName = DATABASE + "_" + Integer.toUnsignedString(new
Random().nextInt(), 36);
+ String tableName = "auto_create_database";
+
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setDatabase(databaseName);
+ tableSchema.setTable(tableName);
+ Map<String, FieldSchema> fields = new HashMap<>();
+ fields.put("id", new FieldSchema("id", "varchar(32)", ""));
+ fields.put("age", new FieldSchema("age", "int", ""));
+ tableSchema.setFields(fields);
+ tableSchema.setDistributeKeys(Collections.singletonList("id"));
+ tableSchema.setModel(DataModel.DUPLICATE);
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put("replication_num", "1");
+ tableSchema.setProperties(tableProperties);
+ schemaChangeManager.createTable(tableSchema);
+
+ Thread.sleep(3_000);
+ Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName,
tableName));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]