JNSimba commented on code in PR #354:
URL:
https://github.com/apache/doris-flink-connector/pull/354#discussion_r1542220855
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java:
##########
@@ -365,13 +368,22 @@ public void fillOriginSchema(String tableName, JsonNode
columns) {
}
}
} else {
- LOG.error(
- "Current schema change failed! You need to ensure that "
- + "there is data in the table."
- + dorisOptions.getTableIdentifier());
+ // In order to be compatible with column changes, the data is
empty or started from
+ // flink checkpoint, resulting in the originFieldSchemaMap not
being filled.
+ LOG.info(tableName + " fill origin field schema from doris
schema.");
fieldSchemaMap = new LinkedHashMap<>();
- Map<String, FieldSchema> finalFieldSchemaMap = fieldSchemaMap;
- columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap,
column));
+ String[] splitTableName = tableName.split("\\.");
+ Schema schema =
+ RestService.getSchema(dorisOptions, splitTableName[0],
splitTableName[1], LOG);
+ List<Field> columnFields = schema.getProperties();
+ for (Field column : columnFields) {
+ String columnName = column.getName();
+ String columnType = column.getType();
+ String columnComment = column.getComment();
+ // TODO need to fill column with default value
+ fieldSchemaMap.put(
+ columnName, new FieldSchema(columnName, columnType,
"", columnComment));
Review Comment:
Can the FieldSchema(String name, String typeString, String comment)
constructor be used here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]