This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 783c9cb  [HUDI-2087] Support Append only in Flink stream (#3252)
783c9cb is described below

commit 783c9cb3693ff7d82cc3d43a4964c64a4da19436
Author: yuzhaojing <[email protected]>
AuthorDate: Sat Jul 10 14:49:35 2021 +0800

    [HUDI-2087] Support Append only in Flink stream (#3252)
    
    Co-authored-by: 喻兆靖 <[email protected]>
---
 .../hudi/table/action/commit/BucketType.java       |  2 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 10 ++-
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |  3 +-
 .../java/org/apache/hudi/io/FlinkCreateHandle.java | 12 +++
 .../commit/BaseFlinkCommitActionExecutor.java      |  5 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  6 ++
 .../sink/partitioner/BucketAssignFunction.java     | 44 +++++++---
 .../hudi/sink/partitioner/BucketAssigner.java      | 10 ++-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  | 11 ++-
 .../org/apache/hudi/table/HoodieTableFactory.java  |  6 ++
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 93 ++++++++++++++++++++--
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java | 15 ++++
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java | 20 ++++-
 .../test/java/org/apache/hudi/utils/TestData.java  | 33 ++++++++
 14 files changed, 242 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
index 70ee473..e1fd161 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
@@ -19,5 +19,5 @@
 package org.apache.hudi.table.action.commit;
 
 public enum BucketType {
-  UPDATE, INSERT
+  UPDATE, INSERT, APPEND_ONLY
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 05e4481..71ca1b6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -56,6 +56,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.HoodieTimelineArchiveLog;
 import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
 import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
 import org.apache.hudi.util.FlinkClientUtil;
@@ -408,6 +409,12 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     final HoodieRecordLocation loc = record.getCurrentLocation();
     final String fileID = loc.getFileId();
     final String partitionPath = record.getPartitionPath();
+    // append only mode always use FlinkCreateHandle
+    if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
+      return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
+          fileID, table.getTaskContextSupplier());
+    }
+
     if (bucketToHandles.containsKey(fileID)) {
       MiniBatchHandle lastHandle = (MiniBatchHandle) 
bucketToHandles.get(fileID);
       if (lastHandle.shouldReplace()) {
@@ -424,7 +431,8 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     if (isDelta) {
       writeHandle = new FlinkAppendHandle<>(config, instantTime, table, 
partitionPath, fileID, recordItr,
           table.getTaskContextSupplier());
-    } else if (loc.getInstantTime().equals("I")) {
+    } else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || 
loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
+      // use the same handle for insert bucket
       writeHandle = new FlinkCreateHandle<>(config, instantTime, table, 
partitionPath,
           fileID, table.getTaskContextSupplier());
     } else {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 987f335..41d0666 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
+import org.apache.hudi.table.action.commit.BucketType;
 
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -80,7 +81,7 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, 
I, K, O>
   @Override
   protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
     return hoodieRecord.getCurrentLocation() != null
-        && hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
+        && 
hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index 3ff579f..49d8918 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.MarkerFiles;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -70,6 +71,17 @@ public class FlinkCreateHandle<T extends 
HoodieRecordPayload, I, K, O>
     }
   }
 
+  @Override
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    // In some rare cases, the task was pulled up again with same write file 
name,
+    // for e.g, reuse the small log files from last commit instant.
+
+    // Just skip the marker file creation if it already exists, the new data 
would append to
+    // the file directly.
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
+  }
+
   /**
    * The flink checkpoints start in sequence and asynchronously, when one 
write task finish the checkpoint(A)
    * (thus the fs view got the written data files some of which may be 
invalid),
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 5cfd28b..7dcc240 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -102,9 +102,7 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
     final HoodieRecord<?> record = inputRecords.get(0);
     final String partitionPath = record.getPartitionPath();
     final String fileId = record.getCurrentLocation().getFileId();
-    final BucketType bucketType = 
record.getCurrentLocation().getInstantTime().equals("I")
-        ? BucketType.INSERT
-        : BucketType.UPDATE;
+    final BucketType bucketType = 
BucketType.valueOf(record.getCurrentLocation().getInstantTime());
     handleUpsertPartition(
         instantTime,
         partitionPath,
@@ -185,6 +183,7 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
       } else {
         switch (bucketType) {
           case INSERT:
+          case APPEND_ONLY:
             return handleInsert(fileIdHint, recordItr);
           case UPDATE:
             return handleUpdate(partitionPath, fileIdHint, recordItr);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index cd309d1..1af4459 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -186,6 +186,12 @@ public class FlinkOptions {
       .defaultValue(TABLE_TYPE_COPY_ON_WRITE)
       .withDescription("Type of table to write. COPY_ON_WRITE (or) 
MERGE_ON_READ");
 
+  public static final ConfigOption<Boolean> APPEND_ONLY_ENABLE = ConfigOptions
+          .key("append_only.enable")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether to write data to new baseFile without 
index, only support in COW, default false");
+
   public static final ConfigOption<String> OPERATION = ConfigOptions
       .key("write.operation")
       .stringType()
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 75a3454..5cc239b 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -28,11 +28,13 @@ import 
org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.bootstrap.IndexRecord;
 import org.apache.hudi.sink.utils.PayloadCreation;
 import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -55,9 +57,9 @@ import java.util.Objects;
  * it then assigns the bucket with ID using the {@link BucketAssigner}.
  *
  * <p>All the records are tagged with HoodieRecordLocation, instead of real 
instant time,
- * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is 
no need to keep
+ * INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant 
time. There is no need to keep
  * the "real" instant time for each record, the bucket ID (partition path & 
fileID) actually decides
- * where the record should write to. The "I" and "U" tags are only used for 
downstream to decide whether
+ * where the record should write to. The "INSERT" and "UPDATE" tags are only 
used for downstream to decide whether
  * the data bucket is an INSERT or an UPSERT, we should factor the tags out 
when the underneath writer
  * supports specifying the bucket type explicitly.
  *
@@ -106,11 +108,18 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
    */
   private final boolean globalIndex;
 
+  private final boolean appendOnly;
+
   public BucketAssignFunction(Configuration conf) {
     this.conf = conf;
     this.isChangingRecords = WriteOperationType.isChangingRecords(
         WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
     this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
+    this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE);
+    if (appendOnly) {
+      
ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()),
+          "APPEND_ONLY mode only support in COPY_ON_WRITE table");
+    }
   }
 
   @Override
@@ -170,25 +179,33 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     final String partitionPath = hoodieKey.getPartitionPath();
     final HoodieRecordLocation location;
 
+    if (appendOnly) {
+      location = getNewRecordLocation(partitionPath);
+      this.context.setCurrentKey(recordKey);
+      record.setCurrentLocation(location);
+      out.collect((O) record);
+      return;
+    }
+
     // Only changing records need looking up the index for the location,
     // append only records are always recognized as INSERT.
     HoodieRecordGlobalLocation oldLoc = indexState.value();
     if (isChangingRecords && oldLoc != null) {
-      // Set up the instant time as "U" to mark the bucket as an update bucket.
+      // Set up the instant time as "UPDATE" to mark the bucket as an update 
bucket.
       if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
         if (globalIndex) {
           // if partition path changes, emit a delete record for old partition 
path,
           // then update the index state using location with new partition 
path.
           HoodieRecord<?> deleteRecord = new HoodieRecord<>(new 
HoodieKey(recordKey, oldLoc.getPartitionPath()),
               payloadCreation.createDeletePayload((BaseAvroPayload) 
record.getData()));
-          deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
+          
deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name()));
           deleteRecord.seal();
           out.collect((O) deleteRecord);
         }
         location = getNewRecordLocation(partitionPath);
         updateIndexState(partitionPath, location);
       } else {
-        location = oldLoc.toLocal("U");
+        location = oldLoc.toLocal(BucketType.UPDATE.name());
         this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
       }
     } else {
@@ -203,17 +220,26 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
   }
 
   private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
-    final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
+    BucketInfo bucketInfo;
+    if (appendOnly) {
+      bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath);
+    } else {
+      bucketInfo = this.bucketAssigner.addInsert(partitionPath);
+    }
+
     final HoodieRecordLocation location;
     switch (bucketInfo.getBucketType()) {
       case INSERT:
-        // This is an insert bucket, use HoodieRecordLocation instant time as 
"I".
+        // This is an insert bucket, use HoodieRecordLocation instant time as 
"INSERT".
         // Downstream operators can then check the instant time to know whether
         // a record belongs to an insert bucket.
-        location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
+        location = new HoodieRecordLocation(BucketType.INSERT.name(), 
bucketInfo.getFileIdPrefix());
         break;
       case UPDATE:
-        location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
+        location = new HoodieRecordLocation(BucketType.UPDATE.name(), 
bucketInfo.getFileIdPrefix());
+        break;
+      case APPEND_ONLY:
+        location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), 
bucketInfo.getFileIdPrefix());
         break;
       default:
         throw new AssertionError();
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index 6d805ce..965b557 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -140,6 +140,14 @@ public class BucketAssigner implements AutoCloseable {
     }
 
     // if we have anything more, create new insert buckets, like normal
+    return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT);
+  }
+
+  public BucketInfo addAppendOnly(String partitionPath) {
+    return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY);
+  }
+
+  private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType 
bucketType) {
     if (newFileAssignStates.containsKey(partitionPath)) {
       NewFileAssignState newFileAssignState = 
newFileAssignStates.get(partitionPath);
       if (newFileAssignState.canAssign()) {
@@ -148,7 +156,7 @@ public class BucketAssigner implements AutoCloseable {
       final String key = StreamerUtil.generateBucketKey(partitionPath, 
newFileAssignState.fileId);
       return bucketInfoMap.get(key);
     }
-    BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, 
FSUtils.createNewFileIdPfx(), partitionPath);
+    BucketInfo bucketInfo = new BucketInfo(bucketType, 
FSUtils.createNewFileIdPfx(), partitionPath);
     final String key = StreamerUtil.generateBucketKey(partitionPath, 
bucketInfo.getFileIdPrefix());
     bucketInfoMap.put(key, bucketInfo);
     newFileAssignStates.put(partitionPath, new 
NewFileAssignState(bucketInfo.getFileIdPrefix(), 
writeProfile.getRecordsPerBucket()));
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 0b4533f..c024d7c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--table-type"}, description = "Type of table. 
COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
   public String tableType;
 
+  @Parameter(names = {"--append-only"}, description = "Write data to new 
parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = 
true)
+  public Boolean appendOnly = false;
+
   @Parameter(names = {"--props"}, description = "Path to properties file on 
localfs or dfs, with configurations for "
       + "hoodie client, schema provider, key generator and data source. For 
hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics 
endpoints, hive configs etc. For sources, refer"
@@ -290,7 +293,13 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
     // copy_on_write works same as COPY_ON_WRITE
     conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
-    conf.setString(FlinkOptions.OPERATION, config.operation.value());
+    conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly);
+    if (config.appendOnly) {
+      // append only should use insert operation
+      conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT.value());
+    } else {
+      conf.setString(FlinkOptions.OPERATION, config.operation.value());
+    }
     conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
     conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
     conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 2eeb8f5..2c2193c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -144,6 +145,11 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       TableSchema schema) {
     // table name
     conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
+    // append only
+    if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) {
+      // append only should use insert operation
+      conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT.value());
+    }
     // hoodie key about options
     setupHoodieKeyOptions(conf, table);
     // cleaning options
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index e145076..4344f45 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -44,6 +45,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -387,12 +389,12 @@ public class TestWriteCopyOnWrite {
   @Test
   public void testInsertWithMiniBatches() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch 
size
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch 
size
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch 
write
+    // Each record is 216 bytes. so 4 records expect to trigger a mini-batch 
write
     for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
     }
@@ -448,13 +450,13 @@ public class TestWriteCopyOnWrite {
   @Test
   public void testInsertWithDeduplication() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch 
size
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch 
size
     conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch 
write
+    // Each record is 216 bytes. so 4 records expect to trigger a mini-batch 
write
     for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
       funcWrapper.invoke(rowData);
     }
@@ -510,14 +512,91 @@ public class TestWriteCopyOnWrite {
   }
 
   @Test
+  public void testAppendOnly() throws Exception {
+    // reset the config option
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch 
size
+    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false);
+    conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
+    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    // Each record is 216 bytes. so 4 records expect to trigger a mini-batch 
write
+    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+      funcWrapper.invoke(rowData);
+    }
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the 
first event first
+    final OperatorEvent event2 = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", event2, 
instanceOf(WriteMetadataEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    String instant = funcWrapper.getWriteClient()
+            .getLastPendingInstant(getTableType());
+
+    funcWrapper.checkpointComplete(1);
+
+    Map<String, List<String>> expected = new HashMap<>();
+
+    expected.put("par1", Arrays.asList(
+        "id1,par1,id1,Danny,23,0,par1",
+        "id1,par1,id1,Danny,23,1,par1",
+        "id1,par1,id1,Danny,23,2,par1",
+        "id1,par1,id1,Danny,23,3,par1",
+        "id1,par1,id1,Danny,23,4,par1"));
+
+    TestData.checkWrittenAllData(tempFile, expected, 1);
+
+    // started a new instant already
+    checkInflightInstant(funcWrapper.getWriteClient());
+    checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, instant);
+
+    // insert duplicates again
+    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+      funcWrapper.invoke(rowData);
+    }
+
+    funcWrapper.checkpointFunction(2);
+
+    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the 
first event first
+    final OperatorEvent event4 = funcWrapper.getNextEvent();
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+    funcWrapper.checkpointComplete(2);
+
+    // Same the original base file content.
+    expected.put("par1", Arrays.asList(
+        "id1,par1,id1,Danny,23,0,par1",
+        "id1,par1,id1,Danny,23,0,par1",
+        "id1,par1,id1,Danny,23,1,par1",
+        "id1,par1,id1,Danny,23,1,par1",
+        "id1,par1,id1,Danny,23,2,par1",
+        "id1,par1,id1,Danny,23,2,par1",
+        "id1,par1,id1,Danny,23,3,par1",
+        "id1,par1,id1,Danny,23,3,par1",
+        "id1,par1,id1,Danny,23,4,par1",
+        "id1,par1,id1,Danny,23,4,par1"));
+    TestData.checkWrittenAllData(tempFile, expected, 1);
+  }
+
+  @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes 
buffer size
+    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes 
buffer size
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    // each record is 208 bytes. so 4 records expect to trigger buffer flush:
+    // each record is 216 bytes. so 4 records expect to trigger buffer flush:
     // flush the max size bucket once at a time.
     for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
@@ -660,7 +739,7 @@ public class TestWriteCopyOnWrite {
   public void testWriteExactlyOnce() throws Exception {
     // reset the config option
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
-    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes 
buffer size
+    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes 
buffer size
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
 
     // open the function and ingest data
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 07e23b5..b983e8c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -23,12 +23,14 @@ import 
org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
@@ -37,6 +39,7 @@ import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Comparator;
@@ -44,6 +47,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 /**
  * Test cases for delta stream write.
  */
@@ -86,6 +91,16 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
     return EXPECTED1;
   }
 
+  @Test
+  public void testAppendOnly() throws Exception {
+    conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
+    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+    assertThrows(IllegalArgumentException.class, () -> {
+      funcWrapper.openFunction();
+    }, "APPEND_ONLY mode only support in COPY_ON_WRITE table");
+  }
+
   protected Map<String, String> getMiniBatchExpected() {
     Map<String, String> expected = new HashMap<>();
     // MOR mode merges the messages with the same key.
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 13a71ec..bd6d3e3 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -19,15 +19,18 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 
 import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 /**
  * Test cases for delta stream write with compaction.
  */
@@ -39,10 +42,19 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
   }
 
-  @Disabled
+  @Override
+  protected Map<String, String> getExpectedBeforeCheckpointComplete() {
+    return EXPECTED1;
+  }
+
   @Test
-  public void testIndexStateBootstrap() {
-    // Ignore the index bootstrap because we only support parquet load now.
+  public void testAppendOnly() throws Exception {
+    conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
+    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+    assertThrows(IllegalArgumentException.class, () -> {
+      funcWrapper.openFunction();
+    }, "APPEND_ONLY mode only support in COPY_ON_WRITE table");
   }
 
   protected Map<String, String> getMiniBatchExpected() {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 50ecf54..2de4859 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -360,8 +360,10 @@ public class TestData {
     assert baseFile.isDirectory();
     FileFilter filter = file -> !file.getName().startsWith(".");
     File[] partitionDirs = baseFile.listFiles(filter);
+
     assertNotNull(partitionDirs);
     assertThat(partitionDirs.length, is(partitions));
+
     for (File partitionDir : partitionDirs) {
       File[] dataFiles = partitionDir.listFiles(filter);
       assertNotNull(dataFiles);
@@ -381,6 +383,37 @@ public class TestData {
     }
   }
 
+  public static void checkWrittenAllData(
+      File baseFile,
+      Map<String, List<String>> expected,
+      int partitions) throws IOException {
+    assert baseFile.isDirectory();
+    FileFilter filter = file -> !file.getName().startsWith(".");
+    File[] partitionDirs = baseFile.listFiles(filter);
+
+    assertNotNull(partitionDirs);
+    assertThat(partitionDirs.length, is(partitions));
+
+    for (File partitionDir : partitionDirs) {
+      File[] dataFiles = partitionDir.listFiles(filter);
+      assertNotNull(dataFiles);
+
+      List<String> readBuffer = new ArrayList<>();
+      for (File dataFile : dataFiles) {
+        ParquetReader<GenericRecord> reader = AvroParquetReader
+            .<GenericRecord>builder(new 
Path(dataFile.getAbsolutePath())).build();
+        GenericRecord nextRecord = reader.read();
+        while (nextRecord != null) {
+          readBuffer.add(filterOutVariables(nextRecord));
+          nextRecord = reader.read();
+        }
+        readBuffer.sort(Comparator.naturalOrder());
+      }
+
+      assertThat(readBuffer, is(expected.get(partitionDir.getName())));
+    }
+  }
+
   /**
    * Checks the source data are written as expected.
    *

Reply via email to