yanghua commented on a change in pull request #2808:
URL: https://github.com/apache/hudi/pull/2808#discussion_r612203287



##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -322,4 +326,12 @@ public static HoodieFlinkWriteClient 
createWriteClient(Configuration conf, Runti
 
     return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf));
   }
+
+  public static String getCommitActionType(WriteOperationType operation, 
HoodieTableType tableType) {

Review comment:
       Can we extract this into `CommitUtils`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
##########
@@ -130,6 +133,20 @@ public String asSummaryString() {
 
   @Override
   public void applyStaticPartition(Map<String, String> map) {
-    // no operation
+    // #applyOverwrite should have been invoked.
+    if (this.overwrite) {
+      final String operationType;
+      if (map.size() > 0) {

Review comment:
       Can we make this variable(`map`) more readable?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
##########
@@ -33,19 +33,24 @@ private BucketAssigners() {}
   /**
    * Creates a {@code BucketAssigner}.
    *
-   * @param taskID    The task ID
-   * @param numTasks  The number of tasks
-   * @param tableType The table type
-   * @param context   The engine context
-   * @param config    The configuration
+   * @param taskID      The task ID

Review comment:
       Can you disable the automatic format in your local?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -401,4 +422,26 @@ public OperatorCoordinator create(Context context) {
       return new StreamWriteOperatorCoordinator(this.conf, context);
     }
   }
+
+  /**
+   * Remember some table state variables.
+   */
+  private static class TableState implements Serializable {

Review comment:
       Shall we make this class follow the POJO specific?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -137,6 +149,8 @@ public void start() throws Exception {
     reset();
     // writeClient

Review comment:
       ditto

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
##########
@@ -125,4 +125,36 @@
       HoodieWriteHandle<?, ?, ?, ?> writeHandle,
       String instantTime,
       List<HoodieRecord<T>> preppedRecords);
+
+  /**
+   * Replaces all the existing records and inserts the specified new records 
into Hoodie table at the supplied instantTime,
+   * for the partition paths contained in input records.
+   *
+   * @param context HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant time for the replace action
+   * @param records input records
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records);
+
+  /**
+   * Delete all the existing records of the Hoodie table and inserts the 
specified new records into Hoodie table at the supplied instantTime,

Review comment:
       Unify the keyword? Choose `Deletes` or `Replaces`(above method)?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -137,6 +149,8 @@ public void start() throws Exception {
     reset();
     // writeClient
     this.writeClient = StreamerUtil.createWriteClient(conf, null);
+    // table state

Review comment:
       Useless comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to