This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d116f1a685 [INLONG-9231][Sort] Find no audit time field when the field
is in upper case (#9232)
d116f1a685 is described below
commit d116f1a6859316aaf22fcab604246d18971fb5c3
Author: vernedeng <[email protected]>
AuthorDate: Wed Nov 8 11:30:51 2023 +0800
[INLONG-9231][Sort] Find no audit time field when the field is in upper
case (#9232)
---
.../apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
index cd303c0234..dfadc07334 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
@@ -32,6 +32,7 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -49,13 +50,14 @@ public class SinkMetadataUtils implements Serializable {
public SinkMetadataUtils(List<String> metadataKeys, DataType
consumedDataType) {
Set<String> metadataKeySet = ImmutableSet.copyOf(metadataKeys);
List<RowType.RowField> metaFields = ((RowType)
consumedDataType.getLogicalType()).getFields();
-
+ List<String> names =
metaFields.stream().map(RowType.RowField::getName).collect(Collectors.toList());
+ log.info("start to config SinkMetadataUtils, metaKeys={}, consume
fields={}", metadataKeys, names);
// get related converters by real keys
// the pos of physical column will be replaced by
IcebergWritableMetadata.NULL
this.converters = metaFields.stream()
.map(RowType.RowField::getName)
.map(key -> Stream.of(IcebergWritableMetadata.values())
- .filter(m -> m.getKey().equals(key))
+ .filter(m -> m.getKey().equalsIgnoreCase(key))
.findFirst()
.orElse(IcebergWritableMetadata.NULL))
.map(IcebergWritableMetadata::getConverter)
@@ -65,8 +67,8 @@ public class SinkMetadataUtils implements Serializable {
ImmutableBiMap.Builder<String, Integer> builder =
ImmutableBiMap.builder();
for (int i = 0; i < metaFields.size(); i++) {
String name = metaFields.get(i).getName();
- if (metadataKeySet.contains(name)) {
- builder.put(name, i);
+ if (metadataKeySet.contains(name.toLowerCase())) {
+ builder.put(name.toLowerCase(), i);
}
}
this.field2posMap = builder.build();
@@ -74,6 +76,7 @@ public class SinkMetadataUtils implements Serializable {
// for audit time
DATA_TIME_INDEX =
field2posMap.getOrDefault(Constants.META_AUDIT_DATA_TIME, -1);
+ log.info("find data time index={}, filed2posMap={}", DATA_TIME_INDEX,
field2posMap);
}
public Integer getMetadataPosByName(String name) {