yanghua commented on a change in pull request #2553:
URL: https://github.com/apache/hudi/pull/2553#discussion_r572611335



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports create write 
incrementally(mini-batches).

Review comment:
       A `HoodieCreateHandle`?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified write handle 
directly.
+ */
+public class ExplicitCreateHandleFactory<T extends HoodieRecordPayload, I, K, 
O>

Review comment:
       So what's the difference between `FlinkCreateHandleFactory` and 
`ExplicitCreateHandleFactory `? Is `FlinkCreateHandleFactory` useless?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write 
incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to 
write,
+ * and closes the file path when the mini-batch write finish. When next 
mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches 
write finish
+ * for a checkpoint round, it renames the last new file path as the desired 
file name
+ * (name with the expected file ID).
+ *

Review comment:
       IMO, can we ignore the concept of the `mini-batch` in the 
implementation(interface). Making it a default mechansim?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -61,14 +62,117 @@ protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig 
config, HoodieEngineCont
     super(config, context, metaClient);
   }
 
+  /**
+   * Upsert a batch of new records into Hoodie table at the supplied 
instantTime.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param records     hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> upsert(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records) {
+    return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, 
this, instantTime, records).execute();
+  }
+
+  /**
+   * Insert a batch of new records into Hoodie table at the supplied 
instantTime.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param records     hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> insert(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records) {
+    return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, 
this, instantTime, records).execute();
+  }
+
+  /**
+   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the 
supplied instantTime {@link HoodieKey}s will be
+   * de-duped and non existent keys will be removed before deleting.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param keys   {@link List} of {@link HoodieKey}s to be deleted
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> delete(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieKey> keys) {
+    return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, 
this, instantTime, keys).execute();
+  }
+
+  /**
+   * Upserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
+   *
+   * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * the underneath file.
+   *
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords) {
+    return new FlinkUpsertPreppedCommitActionExecutor<>(context, writeHandle, 
config, this, instantTime, preppedRecords).execute();
+  }
+
+  /**
+   * Inserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
+   *
+   * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * the underneath file.
+   *
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords) {
+    return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, 
config, this, instantTime, preppedRecords).execute();
+  }
+
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext 
context, String instantTime, List<HoodieRecord<T>> records) {
-    return new FlinkUpsertCommitActionExecutor<>(context, config, this, 
instantTime, records).execute();
+    throw new IllegalAccessError("This method should not be invoked");

Review comment:
       Maybe `UnsupportedOperationException` more reasonable?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write 
incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to 
write,
+ * and closes the file path when the mini-batch write finish. When next 
mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches 
write finish
+ * for a checkpoint round, it renames the last new file path as the desired 
file name
+ * (name with the expected file ID).
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieMergeHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = 
LogManager.getLogger(FlinkMergeHandle.class);
+
+  /**
+   * Records the current file handles number that rolls over.
+   */
+  private int rollNumber = 0;
+  /**
+   * Records the rolled over file paths.
+   */
+  private List<Path> rolloverPaths;
+  /**
+   * Whether it is the first time to generate file handle, E.G. the handle has 
not rolled over yet.
+   */
+  private boolean needBootStrap = true;
+
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                          Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                          TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier);
+    rolloverPaths = new ArrayList<>();
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                          Map<String, HoodieRecord<T>> keyToNewRecords, String 
partitionPath, String fileId,
+                          HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId,
+        dataFileToBeMerged, taskContextSupplier);
+  }
+
+  /**
+   * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+   */
+  protected String generatesDataFileName() {
+    return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + 
rollNumber, hoodieTable.getBaseFileExtension());
+  }
+
+  public boolean isNeedBootStrap() {
+    return needBootStrap;
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    List<WriteStatus> writeStatus = super.close();
+    this.needBootStrap = false;
+    return writeStatus;
+  }
+
+  /**
+   * The difference with the parent method is that there is no need to set up
+   * locations for the records.
+   *
+   * @param fileId        The file ID
+   * @param newRecordsItr The incremental records iterator
+   */
+  @Override
+  protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+    try {
+      // Load the new records in a map
+      long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
+      LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+      this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(writerSchema));
+    } catch (IOException io) {
+      throw new HoodieIOException("Cannot instantiate an 
ExternalSpillableMap", io);
+    }

Review comment:
       At least,  this segment is the same, right? Let's reduce the duplicated 
code.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports create write 
incrementally(mini-batches).
+ *
+ * <p>For the first mini-batch, it initialize and set up the next file path to 
write,
+ * but does not close the file writer until all the mini-batches write finish. 
Each mini-batch
+ * data are appended to the same file.
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
+
+  private static final Logger LOG = 
LogManager.getLogger(FlinkCreateHandle.class);
+  private long lastFileSize = 0L;
+
+  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                           String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier) {
+    this(config, instantTime, hoodieTable, partitionPath, fileId, 
getWriterSchemaIncludingAndExcludingMetadataPair(config),
+        taskContextSupplier);
+  }
+
+  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                           String partitionPath, String fileId, Pair<Schema, 
Schema> writerSchemaIncludingAndExcludingMetadataPair,
+                           TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, 
writerSchemaIncludingAndExcludingMetadataPair,
+        taskContextSupplier);
+  }
+
+  /**
+   * Called by the compactor code path.
+   */
+  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                           String partitionPath, String fileId, Map<String, 
HoodieRecord<T>> recordMap,
+                           TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, 
taskContextSupplier);
+  }
+
+  /**
+   * Get the incremental write status. In mini-batch write mode,
+   * this handle would be reused for a checkpoint bucket(the bucket is 
appended as mini-batches),
+   * thus, after a mini-batch append finish, we do not close the underneath 
writer but return
+   * the incremental WriteStatus instead.
+   *
+   * @return the incremental write status
+   */
+  private WriteStatus incWriteStatus() {

Review comment:
       Let's rename it to be  `getIncrementalWriteStatus`? The prefix `inc` may 
associate with `increase` would make users confused.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write 
incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to 
write,
+ * and closes the file path when the mini-batch write finish. When next 
mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches 
write finish
+ * for a checkpoint round, it renames the last new file path as the desired 
file name
+ * (name with the expected file ID).
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieMergeHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = 
LogManager.getLogger(FlinkMergeHandle.class);
+
+  /**
+   * Records the current file handles number that rolls over.
+   */
+  private int rollNumber = 0;
+  /**
+   * Records the rolled over file paths.
+   */
+  private List<Path> rolloverPaths;
+  /**
+   * Whether it is the first time to generate file handle, E.G. the handle has 
not rolled over yet.
+   */
+  private boolean needBootStrap = true;
+
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                          Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                          TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier);
+    rolloverPaths = new ArrayList<>();
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                          Map<String, HoodieRecord<T>> keyToNewRecords, String 
partitionPath, String fileId,
+                          HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId,
+        dataFileToBeMerged, taskContextSupplier);
+  }
+
+  /**
+   * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+   */
+  protected String dataFileName() {
+    return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + 
rollNumber, hoodieTable.getBaseFileExtension());
+  }
+
+  public boolean isNeedBootStrap() {
+    return needBootStrap;
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    List<WriteStatus> writeStatus = super.close();
+    this.needBootStrap = false;
+    return writeStatus;
+  }
+
+  /**
+   * THe difference with the parent method is that there is no need to set up
+   * locations for the records.
+   *
+   * @param fileId        The file ID
+   * @param newRecordsItr The incremental records iterator
+   */
+  @Override
+  protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+    try {
+      // Load the new records in a map
+      long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
+      LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+      this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(writerSchema));
+    } catch (IOException io) {
+      throw new HoodieIOException("Cannot instantiate an 
ExternalSpillableMap", io);
+    }
+    while (newRecordsItr.hasNext()) {
+      HoodieRecord<T> record = newRecordsItr.next();
+      // NOTE: Once Records are added to map (spillable-map), DO NOT change it 
as they won't persist
+      keyToNewRecords.put(record.getRecordKey(), record);
+    }
+    LOG.info("Number of entries in MemoryBasedMap => "
+        + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+        + "Total size in bytes of MemoryBasedMap => "
+        + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() 
+ "Number of entries in DiskBasedMap => "
+        + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() 
+ "Size of file spilled to disk => "
+        + ((ExternalSpillableMap) 
keyToNewRecords).getSizeOfFileOnDiskInBytes());
+  }
+
+  /**
+   *
+   * Rolls over the write handle to prepare for the next batch write.
+   *
+   * <p>It tweaks the handle state as following:
+   *
+   * <ul>
+   *   <li>Increment the {@code rollNumber}</li>
+   *   <li>Book-keep the last file path, these files (except the last one) are 
temporary that need to be cleaned</li>
+   *   <li>Make the last new file path as old</li>
+   *   <li>Initialize the new file path and file writer</li>
+   * </ul>
+   *
+   * @param newRecordsItr The records iterator to update
+   */
+  public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
+    init(this.fileId, newRecordsItr);
+    this.recordsWritten = 0;
+    this.recordsDeleted = 0;
+    this.updatedRecordsWritten = 0;
+    this.insertRecordsWritten = 0;
+    this.writeStatus.setTotalErrorRecords(0);
+    this.timer = new HoodieTimer().startTimer();
+
+    rollNumber++;
+
+    rolloverPaths.add(newFilePath);
+    oldFilePath = newFilePath;
+    // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch 
write.
+    String newFileName = dataFileName();
+    String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
+        + newFileName).toString();
+    newFilePath = new Path(config.getBasePath(), relativePath);
+
+    try {
+      fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, 
config, writerSchemaWithMetafields, taskContextSupplier);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error when creating file writer for path " 
+ newFilePath, e);
+    }
+
+    LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", 
oldFilePath.toString(),
+        newFilePath.toString()));
+  }
+
+  public void finishWrite() {
+    for (int i = 0; i < rolloverPaths.size() - 1; i++) {
+      Path path = rolloverPaths.get(i);
+      try {
+        fs.delete(path, false);
+      } catch (IOException e) {
+        throw new HoodieIOException("Error when clean the temporary roll file: 
" + path, e);
+      }
+    }
+    Path lastPath = rolloverPaths.size() > 0
+        ? rolloverPaths.get(rolloverPaths.size() - 1)
+        : newFilePath;
+    String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, 
fileId, hoodieTable.getBaseFileExtension());
+    String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
+        + newFileName).toString();
+    final Path desiredPath = new Path(config.getBasePath(), relativePath);
+    try {
+      fs.rename(lastPath, desiredPath);

Review comment:
       You may try to create a new file instead of renaming the current file? 
Refer to `TimelineLayoutV1`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -293,4 +295,9 @@ public static void initTableIfNotExists(Configuration conf) 
throws IOException {
   public static String generateBucketKey(String partitionPath, String fileId) {
     return String.format("%s_%s", partitionPath, fileId);
   }
+
+  /** Returns whether the location represents an insert. */
+  public static boolean isInsert(HoodieRecordLocation loc) {
+    return Objects.equals(loc.getInstantTime(), "I");

Review comment:
       Is `String#equals()`  not suitable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to