geserdugarov commented on code in PR #12796:
URL: https://github.com/apache/hudi/pull/12796#discussion_r1957734844


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -154,88 +147,86 @@ public void initializeState(FunctionInitializationContext 
context) {
   }
 
   @Override
-  public void processElement(I value, Context ctx, Collector<O> out) throws 
Exception {
-    if (value instanceof IndexRecord) {
-      IndexRecord<?> indexRecord = (IndexRecord<?>) value;
-      this.indexState.update((HoodieRecordGlobalLocation) 
indexRecord.getCurrentLocation());
+  public void processElement(HoodieFlinkInternalRow value, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
+    if (value.isIndexRecord()) {
+      this.indexState.update(
+          new Tuple2<>(
+              StringData.fromString(value.getPartitionPath()),
+              StringData.fromString(value.getFileId())));
     } else {
-      processRecord((HoodieRecord<?>) value, out);
+      processRecord(value, out);
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void processRecord(HoodieRecord<?> record, Collector<O> out) throws 
Exception {
+  private void processRecord(HoodieFlinkInternalRow record, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
     // 1. put the record into the BucketAssigner;
     // 2. look up the state for location, if the record has a location, just 
send it out;
     // 3. if it is an INSERT, decide the location using the BucketAssigner 
then send it out.
-    final HoodieKey hoodieKey = record.getKey();
-    final String recordKey = hoodieKey.getRecordKey();
-    final String partitionPath = hoodieKey.getPartitionPath();
-    final HoodieRecordLocation location;
+    String recordKey = record.getRecordKey();
+    String partition = record.getPartitionPath();
+    RowData row = record.getRowData();
 
+    Tuple2<String, String> location;
     if (isChangingRecords) {
       // Only changing records need looking up the index for the location,
       // append only records are always recognized as INSERT.
-      HoodieRecordGlobalLocation oldLoc = indexState.value();
-      if (oldLoc != null) {
+      // Structured as Tuple(partition, fileId, instantTime).
+      Tuple2<StringData, StringData> indexStateValue = indexState.value();
+      if (indexStateValue != null) {
         // Set up the instant time as "U" to mark the bucket as an update 
bucket.
-        if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
+        String partitionFromState = indexStateValue.getField(0).toString();
+        String fileIdFromState = indexStateValue.getField(1).toString();
+        if (!Objects.equals(partitionFromState, partition)) {
+          // [HUDI-8996] No delete records for Flink upsert if partition path 
changed

Review Comment:
   Fixed by 17f7f47f1da0ea81c6593f6dee29fb2a4bc8f3a1.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -81,8 +79,12 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
    *   <li>If it does, tag the record with the location</li>
    *   <li>If it does not, use the {@link BucketAssigner} to generate a new 
bucket ID</li>
    * </ul>
+   *
+   * ValueState here is structured as Tuple2(partition, fileId).
+   * We use Flink Tuple2 because this state will be serialized/deserialized.
+   * Otherwise, for any chosen data structure we should implement custom 
serializer.
    */
-  private ValueState<HoodieRecordGlobalLocation> indexState;
+  private ValueState<Tuple2<StringData, StringData>> indexState;

Review Comment:
   My bad, this place is actually doesn't require optimization, because at this 
place serde will be executed only during checkpoints, which is negligible in 
comparison with serde costs of each stream record.
   
   I've rollbacked previous implementation of `indexState` in 
17f7f47f1da0ea81c6593f6dee29fb2a4bc8f3a1, which contains 
`HoodieRecordGlobalLocation`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to