DongLiang-0 commented on code in PR #272:
URL:
https://github.com/apache/doris-flink-connector/pull/272#discussion_r1427488506
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java:
##########
@@ -111,65 +116,74 @@ private JsonNode convertToJson(Schema schema, Object
value) throws DorisExceptio
} else if (value instanceof BigDecimal) {
return JSON_NODE_FACTORY.numberNode((BigDecimal)
value);
} else {
- throw new DorisException("Invalid type for bytes type:
" + value.getClass());
+ throw new DorisException(
+ "Invalid type for bytes type: " +
value.getClass());
}
- case ARRAY: {
- Collection<?> collection = (Collection<?>) value;
- ArrayNode list = JSON_NODE_FACTORY.arrayNode();
- for (Object elem : collection) {
- Schema valueSchema = schema == null ? null :
schema.valueSchema();
- JsonNode fieldValue = convertToJson(valueSchema, elem);
- list.add(fieldValue);
+ case ARRAY:
+ {
+ Collection<?> collection = (Collection<?>) value;
+ ArrayNode list = JSON_NODE_FACTORY.arrayNode();
+ for (Object elem : collection) {
+ Schema valueSchema = schema == null ? null :
schema.valueSchema();
+ JsonNode fieldValue = convertToJson(valueSchema,
elem);
+ list.add(fieldValue);
+ }
+ return list;
}
- return list;
- }
- case MAP: {
- Map<?, ?> map = (Map<?, ?>) value;
- // If true, using string keys and JSON object; if false,
using non-string keys and Array-encoding
- boolean objectMode;
- if (schema == null) {
- objectMode = true;
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- if (!(entry.getKey() instanceof String)) {
- objectMode = false;
- break;
+ case MAP:
+ {
+ Map<?, ?> map = (Map<?, ?>) value;
+ // If true, using string keys and JSON object; if
false, using non-string
+ // keys and Array-encoding
+ boolean objectMode;
+ if (schema == null) {
+ objectMode = true;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ if (!(entry.getKey() instanceof String)) {
+ objectMode = false;
+ break;
+ }
}
+ } else {
+ objectMode = schema.keySchema().type() ==
Schema.Type.STRING;
}
- } else {
- objectMode = schema.keySchema().type() ==
Schema.Type.STRING;
- }
- ObjectNode obj = null;
- ArrayNode list = null;
- if (objectMode) {
- obj = JSON_NODE_FACTORY.objectNode();
- } else {
- list = JSON_NODE_FACTORY.arrayNode();
- }
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- Schema keySchema = schema == null ? null :
schema.keySchema();
- Schema valueSchema = schema == null ? null :
schema.valueSchema();
- JsonNode mapKey = convertToJson(keySchema,
entry.getKey());
- JsonNode mapValue = convertToJson(valueSchema,
entry.getValue());
-
+ ObjectNode obj = null;
+ ArrayNode list = null;
if (objectMode) {
- obj.set(mapKey.asText(), mapValue);
+ obj = JSON_NODE_FACTORY.objectNode();
} else {
-
list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
+ list = JSON_NODE_FACTORY.arrayNode();
}
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Schema keySchema = schema == null ? null :
schema.keySchema();
+ Schema valueSchema = schema == null ? null :
schema.valueSchema();
+ JsonNode mapKey = convertToJson(keySchema,
entry.getKey());
+ JsonNode mapValue = convertToJson(valueSchema,
entry.getValue());
+
+ if (objectMode) {
+ obj.set(mapKey.asText(), mapValue);
Review Comment:
Notice
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java:
##########
@@ -111,65 +116,74 @@ private JsonNode convertToJson(Schema schema, Object
value) throws DorisExceptio
} else if (value instanceof BigDecimal) {
return JSON_NODE_FACTORY.numberNode((BigDecimal)
value);
} else {
- throw new DorisException("Invalid type for bytes type:
" + value.getClass());
+ throw new DorisException(
+ "Invalid type for bytes type: " +
value.getClass());
}
- case ARRAY: {
- Collection<?> collection = (Collection<?>) value;
- ArrayNode list = JSON_NODE_FACTORY.arrayNode();
- for (Object elem : collection) {
- Schema valueSchema = schema == null ? null :
schema.valueSchema();
- JsonNode fieldValue = convertToJson(valueSchema, elem);
- list.add(fieldValue);
+ case ARRAY:
+ {
+ Collection<?> collection = (Collection<?>) value;
+ ArrayNode list = JSON_NODE_FACTORY.arrayNode();
+ for (Object elem : collection) {
+ Schema valueSchema = schema == null ? null :
schema.valueSchema();
+ JsonNode fieldValue = convertToJson(valueSchema,
elem);
+ list.add(fieldValue);
+ }
+ return list;
}
- return list;
- }
- case MAP: {
- Map<?, ?> map = (Map<?, ?>) value;
- // If true, using string keys and JSON object; if false,
using non-string keys and Array-encoding
- boolean objectMode;
- if (schema == null) {
- objectMode = true;
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- if (!(entry.getKey() instanceof String)) {
- objectMode = false;
- break;
+ case MAP:
+ {
+ Map<?, ?> map = (Map<?, ?>) value;
+ // If true, using string keys and JSON object; if
false, using non-string
+ // keys and Array-encoding
+ boolean objectMode;
+ if (schema == null) {
+ objectMode = true;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ if (!(entry.getKey() instanceof String)) {
+ objectMode = false;
+ break;
+ }
}
+ } else {
+ objectMode = schema.keySchema().type() ==
Schema.Type.STRING;
}
- } else {
- objectMode = schema.keySchema().type() ==
Schema.Type.STRING;
- }
- ObjectNode obj = null;
- ArrayNode list = null;
- if (objectMode) {
- obj = JSON_NODE_FACTORY.objectNode();
- } else {
- list = JSON_NODE_FACTORY.arrayNode();
- }
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- Schema keySchema = schema == null ? null :
schema.keySchema();
- Schema valueSchema = schema == null ? null :
schema.valueSchema();
- JsonNode mapKey = convertToJson(keySchema,
entry.getKey());
- JsonNode mapValue = convertToJson(valueSchema,
entry.getValue());
-
+ ObjectNode obj = null;
+ ArrayNode list = null;
if (objectMode) {
- obj.set(mapKey.asText(), mapValue);
+ obj = JSON_NODE_FACTORY.objectNode();
Review Comment:
As expected, due to the previous code formatting, the code corresponding to
line `155` is on line `164`.
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java:
##########
@@ -43,23 +43,28 @@ public class DateToStringConverter implements
CustomConverter<SchemaBuilder, Rel
private DateTimeFormatter timestampFormatter =
DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
- public static Properties DEFAULT_PROPS = new Properties();
+ public static Properties defaultProps = new Properties();
Review Comment:
That's a good idea.
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java:
##########
@@ -128,11 +140,13 @@ public DataStreamSource<String>
buildCdcSource(StreamExecutionEnvironment env) {
Preconditions.checkNotNull(databaseName, "database-name in oracle is
required");
Preconditions.checkNotNull(schemaName, "schema-name in oracle is
required");
String tableName = config.get(OracleSourceOptions.TABLE_NAME);
- //When debezium incrementally reads, it will be judged based on
regexp_like.
- //When the regular length exceeds 512, an error will be reported, like
ORA-12733: regular expression too long
- if(tableName.length() > 384){
- //max database name length 128
- tableName = StringUtils.isNullOrWhitespaceOnly(includingTables) ?
".*" : includingTables;
+ // When debezium incrementally reads, it will be judged based on
regexp_like.
+ // When the regular length exceeds 512, an error will be reported,
like ORA-12733: regular
+ // expression too long
Review Comment:
sure
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java:
##########
@@ -143,7 +157,8 @@ public void abortPreCommit(String labelSuffix, long chkID)
throws Exception {
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix,
chkID);
while (true) {
try {
- // TODO: According to label abort txn. Currently, it can only
be aborted based on txnid,
+ // TODO: According to label abort txn. Currently, it can only
be aborted based on
+ // txnid,
Review Comment:
sure
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java:
##########
@@ -63,7 +64,9 @@ public void deserialize(SourceRecord sourceRecord,
Collector<String> collector)
private JsonNode convertToJson(Schema schema, Object value) throws
DorisException {
if (value == null) {
- if (schema == null) // Any schema is valid and we don't have a
default, so treat this as an optional schema
+ if (schema
+ == null) // Any schema is valid and we don't have a
default, so treat this as an
+ // optional schema
Review Comment:
sure
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java:
##########
@@ -36,66 +37,76 @@
public class OracleDateConverter implements CustomConverter<SchemaBuilder,
RelationalColumn> {
private static final Logger log =
LoggerFactory.getLogger(OracleDateConverter.class);
- private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[
]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
- private static final Pattern TO_TIMESTAMP =
Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
- private static final Pattern TIMESTAMP_OR_DATE_REGEX =
Pattern.compile("^TIMESTAMP[(]\\d[)]$|^DATE$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TO_DATE =
+ Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)",
Pattern.CASE_INSENSITIVE);
+ private static final Pattern TO_TIMESTAMP =
+ Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)",
Pattern.CASE_INSENSITIVE);
+ private static final Pattern TIMESTAMP_OR_DATE_REGEX =
+ Pattern.compile("^TIMESTAMP[(]\\d[)]$|^DATE$",
Pattern.CASE_INSENSITIVE);
private ZoneId timestampZoneId = ZoneId.systemDefault();
- public static Properties DEFAULT_PROPS = new Properties();
- private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
- private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
- private final DateTimeFormatter dateTimeV2Formatter =
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+ public static Properties defaultProps = new Properties();
+ private final String datetimePattern = "yyyy-MM-dd HH:mm:ss";
Review Comment:
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]