This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 857bae3 [improvement] update create table modify string key to
varchar (#166)
857bae3 is described below
commit 857bae39e9850a63261a246ccbb99396c6c68358
Author: wudi <[email protected]>
AuthorDate: Thu Aug 3 17:58:54 2023 +0800
[improvement] update create table modify string key to varchar (#166)
---
.../org/apache/doris/flink/catalog/doris/DorisSystem.java | 12 ++++++++----
.../java/org/apache/doris/flink/tools/cdc/SourceSchema.java | 6 +++++-
2 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index c0e9daa..b635a96 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -155,7 +155,7 @@ public class DorisSystem {
throw new CreateTableException("key " + key + " not found in
column list");
}
FieldSchema field = fields.get(key);
- buildColumn(sb, field);
+ buildColumn(sb, field, true);
}
//append values
@@ -164,7 +164,7 @@ public class DorisSystem {
continue;
}
FieldSchema field = entry.getValue();
- buildColumn(sb, field);
+ buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() -1);
@@ -210,10 +210,14 @@ public class DorisSystem {
return sb.toString();
}
- private void buildColumn(StringBuilder sql, FieldSchema field){
+ private void buildColumn(StringBuilder sql, FieldSchema field, boolean
isKey){
+ String fieldType = field.getTypeString();
+ if(isKey && DorisType.STRING.equals(fieldType)){
+ fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
+ }
sql.append(identifier(field.getName()))
.append(" ")
- .append(field.getTypeString())
+ .append(fieldType)
.append(" COMMENT '")
.append(quoteComment(field.getComment()))
.append("',");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index c539a75..9168cb5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -78,13 +78,17 @@ public abstract class SourceSchema {
TableSchema tableSchema = new TableSchema();
tableSchema.setModel(this.model);
tableSchema.setFields(this.fields);
- tableSchema.setKeys(this.primaryKeys);
+ tableSchema.setKeys(buildKeys());
tableSchema.setTableComment(this.tableComment);
tableSchema.setDistributeKeys(buildDistributeKeys());
tableSchema.setProperties(tableProps);
return tableSchema;
}
+ private List<String> buildKeys(){
+ return buildDistributeKeys();
+ }
+
private List<String> buildDistributeKeys(){
if(!this.primaryKeys.isEmpty()){
return primaryKeys;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]