michael1991 commented on issue #7988:
URL: https://github.com/apache/hudi/issues/7988#issuecomment-1436228601
Hi @yihua, thanks for your quickly response ! Sorry for my late comment due
to weekends.
My custom payload implementation seems like :
```java
package com.x.y.z.payload;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
/**
* Custom Payload Class to Process Incremental Log
*/
public class CustomPayload extends OverwriteWithLatestAvroPayload {
public CustomPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public CustomPayload(Option<GenericRecord> record) {
super(record);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);
if (!recordOption.isPresent()) {
return Option.empty();
}
GenericRecord insertRecord = (GenericRecord) recordOption.get();
GenericRecord currentRecord = (GenericRecord) currentValue;
if (isDeleteRecord(insertRecord)) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new
GenericRecordBuilder(schema);
List<Schema.Field> fields = schema.getFields();
for (Schema.Field field : fields) {
String fieldName = field.name();
Object insertFieldValue = insertRecord.get(field.name());
Object currentFieldValue = currentRecord.get(field.pos());
switch (fieldName) {
case "double_price_1":
case "double_price_2":
Double insertPrice = toDouble(insertFieldValue);
Double currentPrice = toDouble(currentFieldValue);
if (insertPrice != null && (currentPrice == null ||
insertPrice < currentPrice)) {
builder.set(field, insertFieldValue);
} else {
builder.set(field, currentFieldValue);
}
break;
case "int_flag_1":
case "int_flag_2":
int insertFlag =
Integer.parseInt(insertFieldValue.toString());
int currentFlag =
Integer.parseInt(currentFieldValue.toString());
builder.set(field, currentFlag == 0 && insertFlag ==
1 ? insertFieldValue : currentFieldValue);
break;
case "long_time_1":
case "long_time_2":
Long insertTime = toLong(insertFieldValue);
Long currentTime = toLong(currentFieldValue);
if (insertTime != null && (currentTime == null ||
insertTime < currentTime)) {
builder.set(field, insertFieldValue);
} else {
builder.set(field, currentFieldValue);
}
break;
default:
if
(HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName) || (insertFieldValue !=
null && currentFieldValue == null)) {
builder.set(field, insertFieldValue);
} else {
builder.set(field, currentFieldValue);
}
break;
}
}
return Option.of(builder.build());
}
}
private Double toDouble(Object object) {
if (object == null) {
return null;
} else {
try {
return Double.parseDouble(object.toString());
} catch (Exception exception) {
return null;
}
}
}
private Long toLong(Object object) {
if (object == null) {
return null;
} else {
try {
return Long.parseLong(object.toString());
} catch (Exception exception) {
return null;
}
}
}
}
```
**Sample schema** : key, ts, int_flag_1, double_price_1, long_time_1,
int_flag_2, double_price_2, long_time_2
flag: 0 or 1 , int type
price: double price
long_time: timestamp
key: record key
ts: pre combine column, but unnecessary based on our scenario
Sample data or unit test, I couldn't share it, sorry.
BTW, I saw
[PartialUpdateAvroPayload](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java)
before, but we need more flexible updating on our scenario, so we try to
implement by ourselves.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]