This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46a2399  [HUDI-1902] Global index for flink writer (#2958)
46a2399 is described below

commit 46a2399a45803e1b0d863eec47168a09c37ab920
Author: Danny Chan <[email protected]>
AuthorDate: Tue May 18 13:55:38 2021 +0800

    [HUDI-1902] Global index for flink writer (#2958)
    
    Supports deduplication for record keys with different partition path.
---
 .../apache/hudi/common/model/BaseAvroPayload.java  |   2 +-
 .../common/model/HoodieRecordGlobalLocation.java   |  97 ++++++++++++++++++++
 .../hudi/common/model/HoodieRecordLocation.java    |   4 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   7 ++
 .../sink/partitioner/BucketAssignFunction.java     | 100 +++++++++++++++------
 .../sink/transform/RowDataToHoodieFunction.java    |  59 +-----------
 .../apache/hudi/sink/utils/PayloadCreation.java    |  93 +++++++++++++++++++
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  50 +++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  14 ++-
 hudi-flink/src/test/resources/test_source_4.data   |   8 ++
 10 files changed, 345 insertions(+), 89 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
index 3b35b0d..cd3a95e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
@@ -37,7 +37,7 @@ public abstract class BaseAvroPayload implements Serializable 
{
   /**
    * For purposes of preCombining.
    */
-  protected final Comparable orderingVal;
+  public final Comparable orderingVal;
 
   /**
    * Instantiate {@link BaseAvroPayload}.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
new file mode 100644
index 0000000..f469a1a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but 
with partition path.
+ */
+public class HoodieRecordGlobalLocation extends HoodieRecordLocation {
+  private static final long serialVersionUID = 1L;
+
+  private String partitionPath;
+
+  public HoodieRecordGlobalLocation() {
+  }
+
+  public HoodieRecordGlobalLocation(String partitionPath, String instantTime, 
String fileId) {
+    super(instantTime, fileId);
+    this.partitionPath = partitionPath;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("HoodieGlobalRecordLocation {");
+    sb.append("partitionPath=").append(partitionPath).append(", ");
+    sb.append("instantTime=").append(instantTime).append(", ");
+    sb.append("fileId=").append(fileId);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HoodieRecordGlobalLocation otherLoc = (HoodieRecordGlobalLocation) o;
+    return Objects.equals(partitionPath, otherLoc.partitionPath)
+        && Objects.equals(instantTime, otherLoc.instantTime)
+        && Objects.equals(fileId, otherLoc.fileId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(partitionPath, instantTime, fileId);
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public void setPartitionPath(String partitionPath) {
+    this.partitionPath = partitionPath;
+  }
+
+  /**
+   * Returns the global record location from local.
+   */
+  public static HoodieRecordGlobalLocation fromLocal(String partitionPath, 
HoodieRecordLocation localLoc) {
+    return new HoodieRecordGlobalLocation(partitionPath, 
localLoc.getInstantTime(), localLoc.getFileId());
+  }
+
+  /**
+   * Returns the record location as local.
+   */
+  public HoodieRecordLocation toLocal(String instantTime) {
+    return new HoodieRecordLocation(instantTime, fileId);
+  }
+
+  /**
+   * Copy the location with given partition path.
+   */
+  public HoodieRecordGlobalLocation copy(String partitionPath) {
+    return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
+  }
+}
+
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
index 1692cfb..2b1feab 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
@@ -26,8 +26,8 @@ import java.util.Objects;
  */
 public class HoodieRecordLocation implements Serializable {
 
-  private String instantTime;
-  private String fileId;
+  protected String instantTime;
+  protected String fileId;
 
   public HoodieRecordLocation() {
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4bed143..33a16c0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -86,6 +86,13 @@ public class FlinkOptions {
       .defaultValue(1.5D)
       .withDescription("Index state ttl in days, default 1.5 day");
 
+  public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = 
ConfigOptions
+      .key("index.global.enabled")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Whether to update index for the old partition path\n"
+          + "if same key record with different partition path came in, default 
true");
+
   // ------------------------------------------------------------------------
   //  Read Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 0f599d2..5ad20d5 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -21,9 +21,11 @@ package org.apache.hudi.sink.partitioner;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.BaseAvroPayload;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -32,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.sink.utils.PayloadCreation;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.util.StreamerUtil;
@@ -90,7 +93,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
    *   <li>If it does not, use the {@link BucketAssigner} to generate a new 
bucket ID</li>
    * </ul>
    */
-  private MapState<HoodieKey, HoodieRecordLocation> indexState;
+  private MapState<String, HoodieRecordGlobalLocation> indexState;
 
   /**
    * Bucket assigner to assign new bucket IDs or reuse existing ones.
@@ -110,11 +113,23 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
    */
   private MapState<String, Integer> partitionLoadState;
 
+  /**
+   * Used to create DELETE payload.
+   */
+  private PayloadCreation payloadCreation;
+
+  /**
+   * If the index is global, update the index for the old partition path
+   * if same key record with different partition path came in.
+   */
+  private final boolean globalIndex;
+
   public BucketAssignFunction(Configuration conf) {
     this.conf = conf;
     this.isChangingRecords = WriteOperationType.isChangingRecords(
         WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
     this.bootstrapIndex = 
conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+    this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
   }
 
   @Override
@@ -132,6 +147,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
+    this.payloadCreation = PayloadCreation.instance(this.conf);
   }
 
   @Override
@@ -141,11 +157,11 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
 
   @Override
   public void initializeState(FunctionInitializationContext context) {
-    MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
+    MapStateDescriptor<String, HoodieRecordGlobalLocation> indexStateDesc =
         new MapStateDescriptor<>(
             "indexState",
-            TypeInformation.of(HoodieKey.class),
-            TypeInformation.of(HoodieRecordLocation.class));
+            Types.STRING,
+            TypeInformation.of(HoodieRecordGlobalLocation.class));
     double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 
1000;
     if (ttl > 0) {
       
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
@@ -166,38 +182,41 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     // 3. if it is an INSERT, decide the location using the BucketAssigner 
then send it out.
     HoodieRecord<?> record = (HoodieRecord<?>) value;
     final HoodieKey hoodieKey = record.getKey();
-    final BucketInfo bucketInfo;
+    final String recordKey = hoodieKey.getRecordKey();
+    final String partitionPath = hoodieKey.getPartitionPath();
     final HoodieRecordLocation location;
 
     // The dataset may be huge, thus the processing would block for long,
     // disabled by default.
-    if (bootstrapIndex && 
!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+    if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
       // If the partition records are never loaded, load the records first.
-      loadRecords(hoodieKey.getPartitionPath());
+      loadRecords(partitionPath);
     }
     // Only changing records need looking up the index for the location,
     // append only records are always recognized as INSERT.
-    if (isChangingRecords && this.indexState.contains(hoodieKey)) {
+    if (isChangingRecords && indexState.contains(recordKey)) {
       // Set up the instant time as "U" to mark the bucket as an update bucket.
-      location = new HoodieRecordLocation("U", 
this.indexState.get(hoodieKey).getFileId());
-      this.bucketAssigner.addUpdate(record.getPartitionPath(), 
location.getFileId());
-    } else {
-      bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath());
-      switch (bucketInfo.getBucketType()) {
-        case INSERT:
-          // This is an insert bucket, use HoodieRecordLocation instant time 
as "I".
-          // Downstream operators can then check the instant time to know 
whether
-          // a record belongs to an insert bucket.
-          location = new HoodieRecordLocation("I", 
bucketInfo.getFileIdPrefix());
-          break;
-        case UPDATE:
-          location = new HoodieRecordLocation("U", 
bucketInfo.getFileIdPrefix());
-          break;
-        default:
-          throw new AssertionError();
+      HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey);
+      if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) {
+        if (globalIndex) {
+          // if partition path changes, emit a delete record for old partition 
path,
+          // then update the index state using location with new partition 
path.
+          HoodieRecord<?> deleteRecord = new HoodieRecord<>(new 
HoodieKey(recordKey, oldLoc.getPartitionPath()),
+              payloadCreation.createDeletePayload((BaseAvroPayload) 
record.getData()));
+          deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
+          deleteRecord.seal();
+          out.collect((O) deleteRecord);
+        }
+        location = getNewRecordLocation(partitionPath);
+        updateIndexState(recordKey, partitionPath, location);
+      } else {
+        location = oldLoc.toLocal("U");
+        this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
       }
+    } else {
+      location = getNewRecordLocation(partitionPath);
       if (isChangingRecords) {
-        this.indexState.put(hoodieKey, location);
+        updateIndexState(recordKey, partitionPath, location);
       }
     }
     record.unseal();
@@ -206,6 +225,32 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     out.collect((O) record);
   }
 
+  private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
+    final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
+    final HoodieRecordLocation location;
+    switch (bucketInfo.getBucketType()) {
+      case INSERT:
+        // This is an insert bucket, use HoodieRecordLocation instant time as 
"I".
+        // Downstream operators can then check the instant time to know whether
+        // a record belongs to an insert bucket.
+        location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
+        break;
+      case UPDATE:
+        location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
+        break;
+      default:
+        throw new AssertionError();
+    }
+    return location;
+  }
+
+  private void updateIndexState(
+      String recordKey,
+      String partitionPath,
+      HoodieRecordLocation localLoc) throws Exception {
+    this.indexState.put(recordKey, 
HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc));
+  }
+
   @Override
   public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
@@ -245,7 +290,8 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
           boolean shouldLoad = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(
               hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
           if (shouldLoad) {
-            this.indexState.put(hoodieKey, new 
HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+            this.indexState.put(hoodieKey.getRecordKey(),
+                new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), 
baseFile.getCommitTime(), baseFile.getFileId()));
           }
         } catch (Exception e) {
           LOG.error("Error when putting record keys into the state from file: 
{}", baseFile);
@@ -265,7 +311,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
   @VisibleForTesting
   public boolean isKeyInState(HoodieKey hoodieKey) {
     try {
-      return this.indexState.contains(hoodieKey);
+      return this.indexState.contains(hoodieKey.getRecordKey());
     } catch (Exception e) {
       throw new HoodieException(e);
     }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index 5bd3c68..fcf77db 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -18,16 +18,12 @@
 
 package org.apache.hudi.sink.transform;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.sink.utils.PayloadCreation;
 import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -39,11 +35,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
 
 /**
  * Function that transforms RowData to HoodieRecord.
@@ -116,53 +108,4 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieRecord<?
     HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
     return new HoodieRecord<>(hoodieKey, payload);
   }
-
-  /**
-   * Util to create hoodie pay load instance.
-   */
-  private static class PayloadCreation implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final boolean shouldCombine;
-    private final Constructor<?> constructor;
-    private final String preCombineField;
-
-    private PayloadCreation(
-        boolean shouldCombine,
-        Constructor<?> constructor,
-        @Nullable String preCombineField) {
-      this.shouldCombine = shouldCombine;
-      this.constructor = constructor;
-      this.preCombineField = preCombineField;
-    }
-
-    public static PayloadCreation instance(Configuration conf) throws 
Exception {
-      boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
-          || 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == 
WriteOperationType.UPSERT;
-      String preCombineField = null;
-      final Class<?>[] argTypes;
-      final Constructor<?> constructor;
-      if (shouldCombine) {
-        preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
-        argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
-      } else {
-        argTypes = new Class<?>[] {Option.class};
-      }
-      final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
-      constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
-      return new PayloadCreation(shouldCombine, constructor, preCombineField);
-    }
-
-    public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean 
isDelete) throws Exception {
-      if (shouldCombine) {
-        ValidationUtils.checkState(preCombineField != null);
-        Comparable<?> orderingVal = (Comparable<?>) 
HoodieAvroUtils.getNestedFieldVal(record,
-            preCombineField, false);
-        return (HoodieRecordPayload<?>) constructor.newInstance(
-            isDelete ? null : record, orderingVal);
-      } else {
-        return (HoodieRecordPayload<?>) 
this.constructor.newInstance(Option.of(record));
-      }
-    }
-  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
new file mode 100644
index 0000000..831da25
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.BaseAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+
+/**
+ * Util to create hoodie pay load instance.
+ */
+public class PayloadCreation implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final boolean shouldCombine;
+  private final Constructor<?> constructor;
+  private final String preCombineField;
+
+  private PayloadCreation(
+      boolean shouldCombine,
+      Constructor<?> constructor,
+      @Nullable String preCombineField) {
+    this.shouldCombine = shouldCombine;
+    this.constructor = constructor;
+    this.preCombineField = preCombineField;
+  }
+
+  public static PayloadCreation instance(Configuration conf) throws Exception {
+    boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+        || 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == 
WriteOperationType.UPSERT;
+    String preCombineField = null;
+    final Class<?>[] argTypes;
+    final Constructor<?> constructor;
+    if (shouldCombine) {
+      preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
+      argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
+    } else {
+      argTypes = new Class<?>[] {Option.class};
+    }
+    final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
+    constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
+    return new PayloadCreation(shouldCombine, constructor, preCombineField);
+  }
+
+  public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean 
isDelete) throws Exception {
+    if (shouldCombine) {
+      ValidationUtils.checkState(preCombineField != null);
+      Comparable<?> orderingVal = (Comparable<?>) 
HoodieAvroUtils.getNestedFieldVal(record,
+          preCombineField, false);
+      return (HoodieRecordPayload<?>) constructor.newInstance(
+          isDelete ? null : record, orderingVal);
+    } else {
+      return (HoodieRecordPayload<?>) 
this.constructor.newInstance(Option.of(record));
+    }
+  }
+
+  public HoodieRecordPayload<?> createDeletePayload(BaseAvroPayload payload) 
throws Exception {
+    if (shouldCombine) {
+      return (HoodieRecordPayload<?>) constructor.newInstance(null, 
payload.orderingVal);
+    } else {
+      return (HoodieRecordPayload<?>) 
this.constructor.newInstance(Option.empty());
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 41c587c..af87070 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -393,6 +393,56 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
   }
 
   @Test
+  void testWriteGlobalIndex() {
+    // the source generates 4 commits
+    String createSource = TestConfigurations.getFileSourceDDL(
+        "source", "test_source_4.data", 4);
+    streamTableEnv.executeSql(createSource);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    final String insertInto2 = "insert into t1 select * from source";
+
+    execInsertSql(streamTableEnv, insertInto2);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, "[id1,Phoebe,52,1970-01-01T00:00:08,par4]");
+  }
+
+  @Test
+  void testWriteLocalIndex() {
+    // the source generates 4 commits
+    String createSource = TestConfigurations.getFileSourceDDL(
+        "source", "test_source_4.data", 4);
+    streamTableEnv.executeSql(createSource);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false");
+    options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    final String insertInto2 = "insert into t1 select * from source";
+
+    execInsertSql(streamTableEnv, insertInto2);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "id1,Stephen,34,1970-01-01T00:00:02,par1, "
+        + "id1,Fabian,32,1970-01-01T00:00:04,par2, "
+        + "id1,Jane,19,1970-01-01T00:00:06,par3, "
+        + "id1,Phoebe,52,1970-01-01T00:00:08,par4]";
+    assertRowsEquals(result, expected, 3);
+  }
+
+  @Test
   void testStreamReadEmptyTablePath() throws Exception {
     // create an empty table
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 4a2466c..bb67661 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -256,8 +256,20 @@ public class TestData {
    * @param expected Expected string of the sorted rows
    */
   public static void assertRowsEquals(List<Row> rows, String expected) {
+    assertRowsEquals(rows, expected, 0);
+  }
+
+  /**
+   * Sort the {@code rows} using field at index {@code orderingPos} and asserts
+   * it equals with the expected string {@code expected}.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected string of the sorted rows
+   * @param orderingPos Field position for ordering
+   */
+  public static void assertRowsEquals(List<Row> rows, String expected, int 
orderingPos) {
     String rowsString = rows.stream()
-        .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
+        .sorted(Comparator.comparing(o -> 
toStringSafely(o.getField(orderingPos))))
         .collect(Collectors.toList()).toString();
     assertThat(rowsString, is(expected));
   }
diff --git a/hudi-flink/src/test/resources/test_source_4.data 
b/hudi-flink/src/test/resources/test_source_4.data
new file mode 100644
index 0000000..1ed4d19
--- /dev/null
+++ b/hudi-flink/src/test/resources/test_source_4.data
@@ -0,0 +1,8 @@
+{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", 
"partition": "par1"}
+{"uuid": "id1", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", 
"partition": "par1"}
+{"uuid": "id1", "name": "Julian", "age": 54, "ts": "1970-01-01T00:00:03", 
"partition": "par2"}
+{"uuid": "id1", "name": "Fabian", "age": 32, "ts": "1970-01-01T00:00:04", 
"partition": "par2"}
+{"uuid": "id1", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", 
"partition": "par3"}
+{"uuid": "id1", "name": "Jane", "age": 19, "ts": "1970-01-01T00:00:06", 
"partition": "par3"}
+{"uuid": "id1", "name": "Ella", "age": 38, "ts": "1970-01-01T00:00:07", 
"partition": "par4"}
+{"uuid": "id1", "name": "Phoebe", "age": 52, "ts": "1970-01-01T00:00:08", 
"partition": "par4"}

Reply via email to