geserdugarov commented on code in PR #12796:
URL: https://github.com/apache/hudi/pull/12796#discussion_r1957734844
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -154,88 +147,86 @@ public void initializeState(FunctionInitializationContext
context) {
}
@Override
- public void processElement(I value, Context ctx, Collector<O> out) throws
Exception {
- if (value instanceof IndexRecord) {
- IndexRecord<?> indexRecord = (IndexRecord<?>) value;
- this.indexState.update((HoodieRecordGlobalLocation)
indexRecord.getCurrentLocation());
+ public void processElement(HoodieFlinkInternalRow value, Context ctx,
Collector<HoodieFlinkInternalRow> out) throws Exception {
+ if (value.isIndexRecord()) {
+ this.indexState.update(
+ new Tuple2<>(
+ StringData.fromString(value.getPartitionPath()),
+ StringData.fromString(value.getFileId())));
} else {
- processRecord((HoodieRecord<?>) value, out);
+ processRecord(value, out);
}
}
- @SuppressWarnings("unchecked")
- private void processRecord(HoodieRecord<?> record, Collector<O> out) throws
Exception {
+ private void processRecord(HoodieFlinkInternalRow record,
Collector<HoodieFlinkInternalRow> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just
send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner
then send it out.
- final HoodieKey hoodieKey = record.getKey();
- final String recordKey = hoodieKey.getRecordKey();
- final String partitionPath = hoodieKey.getPartitionPath();
- final HoodieRecordLocation location;
+ String recordKey = record.getRecordKey();
+ String partition = record.getPartitionPath();
+ RowData row = record.getRowData();
+ Tuple2<String, String> location;
if (isChangingRecords) {
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
- HoodieRecordGlobalLocation oldLoc = indexState.value();
- if (oldLoc != null) {
+ // Structured as Tuple(partition, fileId, instantTime).
+ Tuple2<StringData, StringData> indexStateValue = indexState.value();
+ if (indexStateValue != null) {
// Set up the instant time as "U" to mark the bucket as an update
bucket.
- if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
+ String partitionFromState = indexStateValue.getField(0).toString();
+ String fileIdFromState = indexStateValue.getField(1).toString();
+ if (!Objects.equals(partitionFromState, partition)) {
+ // [HUDI-8996] No delete records for Flink upsert if partition path
changed
Review Comment:
Fixed by 17f7f47f1da0ea81c6593f6dee29fb2a4bc8f3a1.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -81,8 +79,12 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
* <li>If it does, tag the record with the location</li>
* <li>If it does not, use the {@link BucketAssigner} to generate a new
bucket ID</li>
* </ul>
+ *
+ * ValueState here is structured as Tuple2(partition, fileId).
+ * We use Flink Tuple2 because this state will be serialized/deserialized.
+ * Otherwise, for any chosen data structure we should implement custom
serializer.
*/
- private ValueState<HoodieRecordGlobalLocation> indexState;
+ private ValueState<Tuple2<StringData, StringData>> indexState;
Review Comment:
My bad, this place is actually doesn't require optimization, because at this
place serde will be executed only during checkpoints, which is negligible in
comparison with serde costs of each stream record.
I've rollbacked previous implementation of `indexState` in
17f7f47f1da0ea81c6593f6dee29fb2a4bc8f3a1, which contains
`HoodieRecordGlobalLocation`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]