yihua commented on a change in pull request #3779:
URL: https://github.com/apache/hudi/pull/3779#discussion_r727640936



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -257,6 +260,11 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
 
   @Override
   protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table = createTable(config, hadoopConf);
+    TransactionUtils.resolveWriteConflictIfAny(table, 
this.txnManager.getCurrentTransactionOwner(),

Review comment:
       I'm okay with this PR going in first.
   
   In general, if there are other write logic we'd like to add in Flink to make 
them on par with Spark, let me know so I can add them during the refactoring 
and reduce the efforts.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -257,6 +260,11 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
 
   @Override
   protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table = createTable(config, hadoopConf);
+    TransactionUtils.resolveWriteConflictIfAny(table, 
this.txnManager.getCurrentTransactionOwner(),

Review comment:
       To my point, I've introduced `HoodieData` abstraction as the data 
structure for passing records, keys, write status, etc., which provides general 
transformation APIs like map, flatMap to hide engine-specific logic.  The 
related PR is here: https://github.com/apache/hudi/pull/3741.  At high level, 
in that way, we should be able to extract common logic from 
`SparkRDDWriteClient`, `HoodieFlinkWriteClient` and any new write actions and 
logic can be added to the common super class, supporting all engines, without 
the need of adding same logic to each engine client.  And Java write client 
class can override the logic in the subclass if needed.  I'm thinking that any 
future changes to this class beyond parallel writing by adding the same logic 
from Spark could be potential duplicate work.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -257,6 +260,11 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
 
   @Override
   protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table = createTable(config, hadoopConf);
+    TransactionUtils.resolveWriteConflictIfAny(table, 
this.txnManager.getCurrentTransactionOwner(),

Review comment:
       Specifically, most differences between Spark and Flink are around 
JavaRDD vs List, which HoodieData abstracts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to