vinothchandar commented on code in PR #13380:
URL: https://github.com/apache/hudi/pull/13380#discussion_r2122291300


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1170,7 +1175,12 @@ public void rollbackFailedBootstrap() {
     if (instant.isPresent() && compareTimestamps(instant.get(), 
LESSER_THAN_OR_EQUALS,
         HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
       LOG.info("Found pending bootstrap instants. Rolling them back");
-      table.rollbackBootstrap(context, createNewInstantTime());
+      txnManager.beginTransaction(Option.empty(), Option.empty());

Review Comment:
   wonder if we should have a `txnManager.doWithinTransaction(Runnable)` (or 
`txnManager.changeState(Runnable)`), and pass in lambdas .. instead of these 
begin/end blocks everywhere.
   
   may be later. someday



##########
azure-pipelines-20230430.yml:
##########
@@ -128,7 +128,7 @@ stages:
     jobs:
       - job: UT_FT_1
         displayName: UT client/spark-client
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'

Review Comment:
   do we know why these are needed.. Wondering if the extra locks adding 
overhead (should not, if most tests are using a JVM synchronized block or sth)



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java:
##########
@@ -480,8 +481,8 @@ public void dropPartition(ObjectPath tablePath, 
CatalogPartitionSpec catalogPart
     }
 
     try (HoodieFlinkWriteClient<?> writeClient = 
HoodieCatalogUtil.createWriteClient(options, tablePathStr, tablePath, 
hadoopConf)) {
-      writeClient.deletePartitions(Collections.singletonList(partitionPathStr),
-              writeClient.createNewInstantTime())
+      String instantTime = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION);

Review Comment:
   nts: this line will do the state change safely i.e request the write first



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -142,12 +143,13 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   private final InstantGenerator instantGenerator;
   private final InstantFileNameGenerator instantFileNameGenerator;
   private final InstantFileNameParser instantFileNameParser;
+  private transient TransactionManager transactionManager;

Review Comment:
   +1 on transient. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -142,12 +143,13 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   private final InstantGenerator instantGenerator;
   private final InstantFileNameGenerator instantFileNameGenerator;
   private final InstantFileNameParser instantFileNameParser;
+  private transient TransactionManager transactionManager;

Review Comment:
   Overall layering wise : Is it better to keep the transactionManager and the 
state changes to the `xxxClient` level.. and leave the `xxxTable` classes to 
just be doing things and returning back to the call sites from client classes.. 
   
   If its all over the place today already, I am okay with this for now. but if 
this is introducing a new pattern - then can we see if some of this code can be 
pulled out of the table layer instead of passing the txnManager down



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -703,15 +709,22 @@ public void rollbackInflightClustering(HoodieInstant 
inflightInstant,
   void rollbackInflightInstant(HoodieInstant inflightInstant,
                                Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
     // Retrieve the rollback information using the provided function.
-    final Pair<String, Boolean> rollbackInfo = 
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime())
-        .map(entry -> Pair.of(entry.getRollbackInstant().requestedTime(), 
false))
-        .orElseGet(() -> Pair.of(getMetaClient().createNewInstantTime(), 
true));
-    // If a rollback has not scheduled (rollbackInfo.getRight() is true), 
schedule it.
-    if (rollbackInfo.getRight()) {
-      scheduleRollback(context, rollbackInfo.getLeft(), inflightInstant, 
false, config.shouldRollbackUsingMarkers(), false);
+    final Option<HoodiePendingRollbackInfo> rollbackInfo = 
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime());

Review Comment:
   nts: to check the pending rollback instant fn



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -621,7 +620,14 @@ protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata me
 
     //  Do an inline partition ttl management if enabled
     if (config.isInlinePartitionTTLEnable()) {
-      String instantTime = createNewInstantTime();
+      txnManager.beginTransaction(Option.empty(), Option.empty());

Review Comment:
   Does this PR now catch all other remaining instances of this.. i.e. can we 
chase down calls to `createNewInstantTime()` fully and get those in this PR. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -341,22 +340,26 @@ private void saveInternalSchema(HoodieTable table, String 
instantTime, HoodieCom
   }
 
   protected HoodieTable createTableAndValidate(HoodieWriteConfig writeConfig,
-                                               BiFunction<HoodieWriteConfig, 
HoodieEngineContext, HoodieTable> createTableFn) {
-    HoodieTable table = createTableFn.apply(writeConfig, context);
+                                               TableCreator createTableFn) {
+    HoodieTable table = createTableFn.apply(writeConfig, context, txnManager);
     
CommonClientUtils.validateTableVersion(table.getMetaClient().getTableConfig(), 
writeConfig);
     return table;
   }
 
   @FunctionalInterface
-  protected interface TriFunction<T, U, V, R> {
-    R apply(T t, U u, V v);
+  protected interface TableCreator {
+    HoodieTable apply(HoodieWriteConfig writeConfig, HoodieEngineContext 
context, TransactionManager transactionManager);
+  }
+
+  @FunctionalInterface
+  protected interface TableCreatorWithMetaClient {
+    HoodieTable apply(HoodieWriteConfig writeConfig, HoodieEngineContext 
context, HoodieTableMetaClient metaClient, TransactionManager 
transactionManager);

Review Comment:
   nit: can `metaClient` be the 4th arg, to cleanly extend TableCreator with 
MetaClient



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java:
##########
@@ -111,11 +111,7 @@ public SparkRDDReadClient(HoodieSparkEngineContext 
context, String basePath, SQL
   public SparkRDDReadClient(HoodieSparkEngineContext context, 
HoodieWriteConfig clientConfig) {
     this.context = context;
     this.storageConf = context.getStorageConf();
-    final String basePath = clientConfig.getBasePath();

Review Comment:
   we should kill this class sometime



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java:
##########
@@ -117,7 +119,7 @@ public HoodieWriteMetadata<List<WriteStatus>> compact(
       HoodieEngineContext context, String compactionInstantTime) {
     RunCompactionActionExecutor compactionExecutor = new 
RunCompactionActionExecutor(
         context, config, this, compactionInstantTime, new 
HoodieFlinkMergeOnReadTableCompactor(),
-        new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()), 
WriteOperationType.COMPACT);
+        this, WriteOperationType.COMPACT);

Review Comment:
   sane. but is something working today, since the new table object inits 
everything again all over? - that may break 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -703,15 +709,22 @@ public void rollbackInflightClustering(HoodieInstant 
inflightInstant,
   void rollbackInflightInstant(HoodieInstant inflightInstant,
                                Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
     // Retrieve the rollback information using the provided function.
-    final Pair<String, Boolean> rollbackInfo = 
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime())
-        .map(entry -> Pair.of(entry.getRollbackInstant().requestedTime(), 
false))
-        .orElseGet(() -> Pair.of(getMetaClient().createNewInstantTime(), 
true));
-    // If a rollback has not scheduled (rollbackInfo.getRight() is true), 
schedule it.
-    if (rollbackInfo.getRight()) {
-      scheduleRollback(context, rollbackInfo.getLeft(), inflightInstant, 
false, config.shouldRollbackUsingMarkers(), false);
+    final Option<HoodiePendingRollbackInfo> rollbackInfo = 
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime());
+    // If a rollback has not scheduled, schedule it.
+    String instantTime;
+    if (rollbackInfo.isEmpty()) {
+      transactionManager.beginTransaction(Option.empty(), Option.empty());
+      try {
+        instantTime = getMetaClient().createNewInstantTime(false);
+        scheduleRollback(context, instantTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(), false);
+      } finally {
+        transactionManager.endTransaction(Option.empty());
+      }
+    } else {
+      instantTime = rollbackInfo.get().getRollbackInstant().requestedTime();
     }
     // Perform the rollback.
-    rollback(context, rollbackInfo.getLeft(), inflightInstant, false, false);
+    rollback(context, instantTime, inflightInstant, false, false);

Review Comment:
   nts: check if this will call txnManager for state change



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -723,11 +736,17 @@ void rollbackInflightInstant(HoodieInstant 
inflightInstant,
    * @param inflightInstant Inflight Compaction Instant
    */
   public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, 
Function<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInstantFunc) {
-    final String commitTime = 
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime()).map(entry
-        -> entry.getRollbackInstant().requestedTime())
-        .orElseGet(() -> getMetaClient().createNewInstantTime());
-    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(),
-        false);
+    transactionManager.beginTransaction(Option.empty(), Option.empty());

Review Comment:
   can we define an overload for `beginTransaction(empty,empty)` with no args 
`beginTransaction()` .. Also, why would not we pass in `inflightInstant` for 
the first arg. I get that the second arg is kind of not useful for every state 
change.. 
   
   Same q for all other calls. I think the args are just used for logging.. so 
it helps us trace state changes .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to