yanghua commented on a change in pull request #2808:
URL: https://github.com/apache/hudi/pull/2808#discussion_r612203287
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -322,4 +326,12 @@ public static HoodieFlinkWriteClient
createWriteClient(Configuration conf, Runti
return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf));
}
+
+ public static String getCommitActionType(WriteOperationType operation,
HoodieTableType tableType) {
Review comment:
Can we extract this into `CommitUtils`?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
##########
@@ -130,6 +133,20 @@ public String asSummaryString() {
@Override
public void applyStaticPartition(Map<String, String> map) {
- // no operation
+ // #applyOverwrite should have been invoked.
+ if (this.overwrite) {
+ final String operationType;
+ if (map.size() > 0) {
Review comment:
Can we make this variable(`map`) more readable?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
##########
@@ -33,19 +33,24 @@ private BucketAssigners() {}
/**
* Creates a {@code BucketAssigner}.
*
- * @param taskID The task ID
- * @param numTasks The number of tasks
- * @param tableType The table type
- * @param context The engine context
- * @param config The configuration
+ * @param taskID The task ID
Review comment:
Can you disable the automatic format in your local?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -401,4 +422,26 @@ public OperatorCoordinator create(Context context) {
return new StreamWriteOperatorCoordinator(this.conf, context);
}
}
+
+ /**
+ * Remember some table state variables.
+ */
+ private static class TableState implements Serializable {
Review comment:
Shall we make this class follow the POJO specific?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -137,6 +149,8 @@ public void start() throws Exception {
reset();
// writeClient
Review comment:
ditto
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
##########
@@ -125,4 +125,36 @@
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> preppedRecords);
+
+ /**
+ * Replaces all the existing records and inserts the specified new records
into Hoodie table at the supplied instantTime,
+ * for the partition paths contained in input records.
+ *
+ * @param context HoodieEngineContext
+ * @param writeHandle The write handle
+ * @param instantTime Instant time for the replace action
+ * @param records input records
+ * @return HoodieWriteMetadata
+ */
+ HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> records);
+
+ /**
+ * Delete all the existing records of the Hoodie table and inserts the
specified new records into Hoodie table at the supplied instantTime,
Review comment:
Unify the keyword? Choose `Deletes` or `Replaces`(above method)?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -137,6 +149,8 @@ public void start() throws Exception {
reset();
// writeClient
this.writeClient = StreamerUtil.createWriteClient(conf, null);
+ // table state
Review comment:
Useless comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]