This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a66bc3fb0f913c1ed307b35fd05473917155a71 Author: Vova Kolmakov <[email protected]> AuthorDate: Thu Apr 25 07:21:46 2024 +0700 [HUDI-7660] Fix excessive object creation in RowDataKeyGen (#11084) --- .../org/apache/hudi/sink/bulk/RowDataKeyGen.java | 45 ++++++++++++---------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java index a9f34b36d27..c377575db5e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -56,6 +56,8 @@ public class RowDataKeyGen implements Serializable { private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + private static final String HIVE_PARTITION_TEMPLATE = "%s=%s"; + private static final String DEFAULT_FIELD_SEPARATOR = ","; private final String[] recordKeyFields; private final String[] partitionPathFields; @@ -86,7 +88,7 @@ public class RowDataKeyGen implements Serializable { boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled, Option<TimestampBasedAvroKeyGenerator> keyGenOpt) { - this.partitionPathFields = partitionFields.split(","); + this.partitionPathFields = partitionFields.split(DEFAULT_FIELD_SEPARATOR); this.hiveStylePartitioning = hiveStylePartitioning; this.encodePartitionPath = encodePartitionPath; this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled; @@ -98,7 +100,7 @@ public class RowDataKeyGen implements Serializable { this.recordKeyFields = null; this.recordKeyProjection = null; } else { - this.recordKeyFields = recordKeys.get().split(","); + this.recordKeyFields = recordKeys.get().split(DEFAULT_FIELD_SEPARATOR); if (this.recordKeyFields.length == 1) { // efficient code path this.simpleRecordKey = true; @@ -166,7 +168,7 @@ public class RowDataKeyGen implements Serializable { } } - // reference: org.apache.hudi.keygen.KeyGenUtils.getRecordPartitionPath + // reference: org.apache.hudi.keygen.KeyGenUtils.getRecordKey private static String getRecordKey(Object[] keyValues, String[] keyFields, boolean consistentLogicalTimestampEnabled) { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); @@ -176,28 +178,28 @@ public class RowDataKeyGen implements Serializable { value = getTimestampValue(consistentLogicalTimestampEnabled, value); String recordKeyValue = StringUtils.objToString(value); if (recordKeyValue == null) { - recordKey.append(recordKeyField).append(":").append(NULL_RECORDKEY_PLACEHOLDER).append(","); + recordKey.append(recordKeyField).append(":").append(NULL_RECORDKEY_PLACEHOLDER); } else if (recordKeyValue.isEmpty()) { - recordKey.append(recordKeyField).append(":").append(EMPTY_RECORDKEY_PLACEHOLDER).append(","); + recordKey.append(recordKeyField).append(":").append(EMPTY_RECORDKEY_PLACEHOLDER); } else { - recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(","); + recordKey.append(recordKeyField).append(":").append(recordKeyValue); keyIsNullEmpty = false; } + if (i != keyValues.length - 1) { + recordKey.append(DEFAULT_FIELD_SEPARATOR); + } } - recordKey.deleteCharAt(recordKey.length() - 1); if (keyIsNullEmpty) { - throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " - + Arrays.toString(keyFields) + " cannot be entirely null or empty."); + throw new HoodieKeyException(String.format("recordKey values: \"%s\" for fields: %s cannot be entirely null or empty.", + recordKey, Arrays.toString(keyFields))); } return recordKey.toString(); } private static Object getTimestampValue(boolean consistentLogicalTimestampEnabled, Object value) { - if (!consistentLogicalTimestampEnabled) { - if (value instanceof TimestampData) { - TimestampData timestampData = (TimestampData) value; - value = timestampData.toTimestamp().toInstant().toEpochMilli(); - } + if (!consistentLogicalTimestampEnabled && (value instanceof TimestampData)) { + TimestampData timestampData = (TimestampData) value; + value = timestampData.toTimestamp().toInstant().toEpochMilli(); } return value; } @@ -213,17 +215,17 @@ public class RowDataKeyGen implements Serializable { String partField = partFields[i]; String partValue = StringUtils.objToString(partValues[i]); if (partValue == null || partValue.isEmpty()) { - partitionPath.append(hiveStylePartitioning ? partField + "=" + DEFAULT_PARTITION_PATH - : DEFAULT_PARTITION_PATH); + partitionPath.append(hiveStylePartitioning ? String.format(HIVE_PARTITION_TEMPLATE, partField, DEFAULT_PARTITION_PATH) : DEFAULT_PARTITION_PATH); } else { if (encodePartitionPath) { partValue = escapePathName(partValue); } - partitionPath.append(hiveStylePartitioning ? partField + "=" + partValue : partValue); + partitionPath.append(hiveStylePartitioning ? String.format(HIVE_PARTITION_TEMPLATE, partField, partValue) : partValue); + } + if (i != partFields.length - 1) { + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); } - partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); } - partitionPath.deleteCharAt(partitionPath.length() - 1); return partitionPath.toString(); } @@ -232,7 +234,8 @@ public class RowDataKeyGen implements Serializable { recordKeyValue = getTimestampValue(consistentLogicalTimestampEnabled, recordKeyValue); String recordKey = StringUtils.objToString(recordKeyValue); if (recordKey == null || recordKey.isEmpty()) { - throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); + throw new HoodieKeyException(String.format("recordKey value: \"%s\" for field: \"%s\" cannot be null or empty.", + recordKey, recordKeyField)); } return recordKey; } @@ -256,7 +259,7 @@ public class RowDataKeyGen implements Serializable { partitionPath = escapePathName(partitionPath); } if (hiveStylePartitioning) { - partitionPath = partField + "=" + partitionPath; + partitionPath = String.format(HIVE_PARTITION_TEMPLATE, partField, partitionPath); } return partitionPath; }
