tibrewalpratik17 commented on code in PR #13103:
URL: https://github.com/apache/pinot/pull/13103#discussion_r1608866013
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java:
##########
@@ -36,47 +37,84 @@
* </ul>
* <p>NOTE: should put this after the {@link DataTypeTransformer} so that all
values follow the data types in
* {@link FieldSpec}.
+ * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide
what to do when the value exceeds the max.
+ * For TRIM_LENGTH, the value is trimmed to the max length.
+ * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null
value string.
+ * For FAIL_INGESTION, an exception is thrown and the record is skipped.
+ * In the first 2 scenarios, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can
be tracked to know if a trimmed /
+ * default record was persisted.
+ * In the last scenario, this metric ROWS_WITH_ERRORS can be tracked to know
if a record was skipped.
*/
public class SanitizationTransformer implements RecordTransformer {
- private final Map<String, Integer> _stringColumnMaxLengthMap = new
HashMap<>();
+ private static final String NULL_CHARACTER = "\0";
+ private final Map<String, FieldSpec> _stringColumnToFieldSpecMap = new
HashMap<>();
public SanitizationTransformer(Schema schema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
if (!fieldSpec.isVirtualColumn() && fieldSpec.getDataType() ==
DataType.STRING) {
- _stringColumnMaxLengthMap.put(fieldSpec.getName(),
fieldSpec.getMaxLength());
+ _stringColumnToFieldSpecMap.put(fieldSpec.getName(), fieldSpec);
}
}
}
@Override
public boolean isNoOp() {
- return _stringColumnMaxLengthMap.isEmpty();
+ return _stringColumnToFieldSpecMap.isEmpty();
}
@Override
public GenericRow transform(GenericRow record) {
- for (Map.Entry<String, Integer> entry :
_stringColumnMaxLengthMap.entrySet()) {
+ for (Map.Entry<String, FieldSpec> entry :
_stringColumnToFieldSpecMap.entrySet()) {
String stringColumn = entry.getKey();
- int maxLength = entry.getValue();
Object value = record.getValue(stringColumn);
if (value instanceof String) {
// Single-valued column
- String stringValue = (String) value;
- String sanitizedValue = StringUtil.sanitizeStringValue(stringValue,
maxLength);
- // NOTE: reference comparison
- //noinspection StringEquality
- if (sanitizedValue != stringValue) {
- record.putValue(stringColumn, sanitizedValue);
+ Pair<String, Boolean> result = sanitizeValue(stringColumn, (String)
value, entry.getValue());
+ record.putValue(stringColumn, result.getLeft());
+ if (result.getRight()) {
+ record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
}
} else {
// Multi-valued column
Object[] values = (Object[]) value;
- int numValues = values.length;
- for (int i = 0; i < numValues; i++) {
- values[i] = StringUtil.sanitizeStringValue(values[i].toString(),
maxLength);
+ for (int i = 0; i < values.length; i++) {
+ Pair<String, Boolean> result = sanitizeValue(stringColumn,
values[i].toString(), entry.getValue());
+ values[i] = result.getLeft();
+ if (result.getRight()) {
+ record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
+ }
}
}
}
return record;
}
+
+ private Pair<String, Boolean> sanitizeValue(String stringColumn, String
value, FieldSpec columnFieldSpec) {
Review Comment:
Added javadocs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]