vinothchandar commented on code in PR #13380:
URL: https://github.com/apache/hudi/pull/13380#discussion_r2122291300
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1170,7 +1175,12 @@ public void rollbackFailedBootstrap() {
if (instant.isPresent() && compareTimestamps(instant.get(),
LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
LOG.info("Found pending bootstrap instants. Rolling them back");
- table.rollbackBootstrap(context, createNewInstantTime());
+ txnManager.beginTransaction(Option.empty(), Option.empty());
Review Comment:
wonder if we should have a `txnManager.doWithinTransaction(Runnable)` (or
`txnManager.changeState(Runnable)`), and pass in lambdas .. instead of these
begin/end blocks everywhere.
may be later. someday
##########
azure-pipelines-20230430.yml:
##########
@@ -128,7 +128,7 @@ stages:
jobs:
- job: UT_FT_1
displayName: UT client/spark-client
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
Review Comment:
do we know why these are needed.. Wondering if the extra locks adding
overhead (should not, if most tests are using a JVM synchronized block or sth)
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java:
##########
@@ -480,8 +481,8 @@ public void dropPartition(ObjectPath tablePath,
CatalogPartitionSpec catalogPart
}
try (HoodieFlinkWriteClient<?> writeClient =
HoodieCatalogUtil.createWriteClient(options, tablePathStr, tablePath,
hadoopConf)) {
- writeClient.deletePartitions(Collections.singletonList(partitionPathStr),
- writeClient.createNewInstantTime())
+ String instantTime =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION);
Review Comment:
nts: this line will do the state change safely i.e request the write first
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -142,12 +143,13 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
private final InstantGenerator instantGenerator;
private final InstantFileNameGenerator instantFileNameGenerator;
private final InstantFileNameParser instantFileNameParser;
+ private transient TransactionManager transactionManager;
Review Comment:
+1 on transient.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -142,12 +143,13 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
private final InstantGenerator instantGenerator;
private final InstantFileNameGenerator instantFileNameGenerator;
private final InstantFileNameParser instantFileNameParser;
+ private transient TransactionManager transactionManager;
Review Comment:
Overall layering wise : Is it better to keep the transactionManager and the
state changes to the `xxxClient` level.. and leave the `xxxTable` classes to
just be doing things and returning back to the call sites from client classes..
If its all over the place today already, I am okay with this for now. but if
this is introducing a new pattern - then can we see if some of this code can be
pulled out of the table layer instead of passing the txnManager down
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -703,15 +709,22 @@ public void rollbackInflightClustering(HoodieInstant
inflightInstant,
void rollbackInflightInstant(HoodieInstant inflightInstant,
Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
// Retrieve the rollback information using the provided function.
- final Pair<String, Boolean> rollbackInfo =
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime())
- .map(entry -> Pair.of(entry.getRollbackInstant().requestedTime(),
false))
- .orElseGet(() -> Pair.of(getMetaClient().createNewInstantTime(),
true));
- // If a rollback has not scheduled (rollbackInfo.getRight() is true),
schedule it.
- if (rollbackInfo.getRight()) {
- scheduleRollback(context, rollbackInfo.getLeft(), inflightInstant,
false, config.shouldRollbackUsingMarkers(), false);
+ final Option<HoodiePendingRollbackInfo> rollbackInfo =
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime());
Review Comment:
nts: to check the pending rollback instant fn
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -621,7 +620,14 @@ protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata me
// Do an inline partition ttl management if enabled
if (config.isInlinePartitionTTLEnable()) {
- String instantTime = createNewInstantTime();
+ txnManager.beginTransaction(Option.empty(), Option.empty());
Review Comment:
Does this PR now catch all other remaining instances of this.. i.e. can we
chase down calls to `createNewInstantTime()` fully and get those in this PR.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -341,22 +340,26 @@ private void saveInternalSchema(HoodieTable table, String
instantTime, HoodieCom
}
protected HoodieTable createTableAndValidate(HoodieWriteConfig writeConfig,
- BiFunction<HoodieWriteConfig,
HoodieEngineContext, HoodieTable> createTableFn) {
- HoodieTable table = createTableFn.apply(writeConfig, context);
+ TableCreator createTableFn) {
+ HoodieTable table = createTableFn.apply(writeConfig, context, txnManager);
CommonClientUtils.validateTableVersion(table.getMetaClient().getTableConfig(),
writeConfig);
return table;
}
@FunctionalInterface
- protected interface TriFunction<T, U, V, R> {
- R apply(T t, U u, V v);
+ protected interface TableCreator {
+ HoodieTable apply(HoodieWriteConfig writeConfig, HoodieEngineContext
context, TransactionManager transactionManager);
+ }
+
+ @FunctionalInterface
+ protected interface TableCreatorWithMetaClient {
+ HoodieTable apply(HoodieWriteConfig writeConfig, HoodieEngineContext
context, HoodieTableMetaClient metaClient, TransactionManager
transactionManager);
Review Comment:
nit: can `metaClient` be the 4th arg, to cleanly extend TableCreator with
MetaClient
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java:
##########
@@ -111,11 +111,7 @@ public SparkRDDReadClient(HoodieSparkEngineContext
context, String basePath, SQL
public SparkRDDReadClient(HoodieSparkEngineContext context,
HoodieWriteConfig clientConfig) {
this.context = context;
this.storageConf = context.getStorageConf();
- final String basePath = clientConfig.getBasePath();
Review Comment:
we should kill this class sometime
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java:
##########
@@ -117,7 +119,7 @@ public HoodieWriteMetadata<List<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
RunCompactionActionExecutor compactionExecutor = new
RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new
HoodieFlinkMergeOnReadTableCompactor(),
- new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()),
WriteOperationType.COMPACT);
+ this, WriteOperationType.COMPACT);
Review Comment:
sane. but is something working today, since the new table object inits
everything again all over? - that may break
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -703,15 +709,22 @@ public void rollbackInflightClustering(HoodieInstant
inflightInstant,
void rollbackInflightInstant(HoodieInstant inflightInstant,
Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
// Retrieve the rollback information using the provided function.
- final Pair<String, Boolean> rollbackInfo =
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime())
- .map(entry -> Pair.of(entry.getRollbackInstant().requestedTime(),
false))
- .orElseGet(() -> Pair.of(getMetaClient().createNewInstantTime(),
true));
- // If a rollback has not scheduled (rollbackInfo.getRight() is true),
schedule it.
- if (rollbackInfo.getRight()) {
- scheduleRollback(context, rollbackInfo.getLeft(), inflightInstant,
false, config.shouldRollbackUsingMarkers(), false);
+ final Option<HoodiePendingRollbackInfo> rollbackInfo =
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime());
+ // If a rollback has not scheduled, schedule it.
+ String instantTime;
+ if (rollbackInfo.isEmpty()) {
+ transactionManager.beginTransaction(Option.empty(), Option.empty());
+ try {
+ instantTime = getMetaClient().createNewInstantTime(false);
+ scheduleRollback(context, instantTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(), false);
+ } finally {
+ transactionManager.endTransaction(Option.empty());
+ }
+ } else {
+ instantTime = rollbackInfo.get().getRollbackInstant().requestedTime();
}
// Perform the rollback.
- rollback(context, rollbackInfo.getLeft(), inflightInstant, false, false);
+ rollback(context, instantTime, inflightInstant, false, false);
Review Comment:
nts: check if this will call txnManager for state change
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -723,11 +736,17 @@ void rollbackInflightInstant(HoodieInstant
inflightInstant,
* @param inflightInstant Inflight Compaction Instant
*/
public void rollbackInflightLogCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInstantFunc) {
- final String commitTime =
getPendingRollbackInstantFunc.apply(inflightInstant.requestedTime()).map(entry
- -> entry.getRollbackInstant().requestedTime())
- .orElseGet(() -> getMetaClient().createNewInstantTime());
- scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(),
- false);
+ transactionManager.beginTransaction(Option.empty(), Option.empty());
Review Comment:
can we define an overload for `beginTransaction(empty,empty)` with no args
`beginTransaction()` .. Also, why would not we pass in `inflightInstant` for
the first arg. I get that the second arg is kind of not useful for every state
change..
Same q for all other calls. I think the args are just used for logging.. so
it helps us trace state changes .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]