danny0405 commented on code in PR #12796:
URL: https://github.com/apache/hudi/pull/12796#discussion_r1957018469


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -154,88 +147,86 @@ public void initializeState(FunctionInitializationContext 
context) {
   }
 
   @Override
-  public void processElement(I value, Context ctx, Collector<O> out) throws 
Exception {
-    if (value instanceof IndexRecord) {
-      IndexRecord<?> indexRecord = (IndexRecord<?>) value;
-      this.indexState.update((HoodieRecordGlobalLocation) 
indexRecord.getCurrentLocation());
+  public void processElement(HoodieFlinkInternalRow value, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
+    if (value.isIndexRecord()) {
+      this.indexState.update(
+          new Tuple2<>(
+              StringData.fromString(value.getPartitionPath()),
+              StringData.fromString(value.getFileId())));
     } else {
-      processRecord((HoodieRecord<?>) value, out);
+      processRecord(value, out);
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void processRecord(HoodieRecord<?> record, Collector<O> out) throws 
Exception {
+  private void processRecord(HoodieFlinkInternalRow record, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
     // 1. put the record into the BucketAssigner;
     // 2. look up the state for location, if the record has a location, just 
send it out;
     // 3. if it is an INSERT, decide the location using the BucketAssigner 
then send it out.
-    final HoodieKey hoodieKey = record.getKey();
-    final String recordKey = hoodieKey.getRecordKey();
-    final String partitionPath = hoodieKey.getPartitionPath();
-    final HoodieRecordLocation location;
+    String recordKey = record.getRecordKey();
+    String partition = record.getPartitionPath();
+    RowData row = record.getRowData();
 
+    Tuple2<String, String> location;
     if (isChangingRecords) {
       // Only changing records need looking up the index for the location,
       // append only records are always recognized as INSERT.
-      HoodieRecordGlobalLocation oldLoc = indexState.value();
-      if (oldLoc != null) {
+      // Structured as Tuple(partition, fileId, instantTime).
+      Tuple2<StringData, StringData> indexStateValue = indexState.value();
+      if (indexStateValue != null) {
         // Set up the instant time as "U" to mark the bucket as an update 
bucket.
-        if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
+        String partitionFromState = indexStateValue.getField(0).toString();
+        String fileIdFromState = indexStateValue.getField(1).toString();
+        if (!Objects.equals(partitionFromState, partition)) {
+          // [HUDI-8996] No delete records for Flink upsert if partition path 
changed

Review Comment:
   Fix it in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to