vinothchandar commented on code in PR #13340:
URL: https://github.com/apache/hudi/pull/13340#discussion_r2110604041
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -666,71 +632,64 @@ protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata me
*
* @param extraMetadata Metadata to pass onto the scheduled service
instant
* @param tableServiceType Type of table service to schedule
- * @return
+ * @return the requested instant time if the service was scheduled
*/
- public Option<String> scheduleTableService(String instantTime,
Option<Map<String, String>> extraMetadata,
+ public Option<String> scheduleTableService(Option<Map<String, String>>
extraMetadata,
TableServiceType
tableServiceType) {
- // A lock is required to guard against race conditions between an ongoing
writer and scheduling a table service.
- HoodieTableConfig tableConfig =
HoodieTableConfig.loadFromHoodieProps(storage, config.getBasePath());
- InstantGenerator instantGenerator =
TimelineLayout.fromVersion(tableConfig.getTableVersion().getTimelineLayoutVersion()).getInstantGenerator();
- final Option<HoodieInstant> inflightInstant =
Option.of(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
- tableServiceType.getAction(), instantTime));
- try {
- this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service {} for table {}", tableServiceType,
config.getBasePath());
- return scheduleTableServiceInternal(instantTime, extraMetadata,
tableServiceType);
- } finally {
- this.txnManager.endTransaction(inflightInstant);
- }
+ return scheduleTableServiceInternal(Option.empty(), extraMetadata,
tableServiceType);
}
- protected Option<String> scheduleTableServiceInternal(String instantTime,
Option<Map<String, String>> extraMetadata,
- TableServiceType
tableServiceType) {
+ Option<String> scheduleTableServiceInternal(Option<String>
providedInstantTime, Option<Map<String, String>> extraMetadata,
+ TableServiceType
tableServiceType) {
if (!tableServicesEnabled(config)) {
return Option.empty();
}
+ if (tableServiceType == TableServiceType.ARCHIVE) {
+ LOG.info("Scheduling archiving is not supported. Skipping.");
+ return Option.empty();
+ }
+ if (tableServiceType == TableServiceType.CLEAN) {
+ // Cleaning acts on historical commits and is handled differently
Review Comment:
// Cleaning is a frequent operation that does not conflict with other
operations and is idempotent. So handled differently to avoid locking for
planning.
Can we change to this
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -53,16 +53,15 @@
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
I, K, O, Option<HoodieCleanerPlan>> {
-
+ public static final HoodieCleanerPlan EMPTY_CLEANER_PLAN = new
HoodieCleanerPlan();
private static final Logger LOG =
LoggerFactory.getLogger(CleanPlanActionExecutor.class);
private final Option<Map<String, String>> extraMetadata;
public CleanPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
- String instantTime,
Option<Map<String, String>> extraMetadata) {
- super(context, config, table, instantTime);
+ super(context, config, table, null);
Review Comment:
use of `null`.. but it ll change many things if we change
`BaseActionExecutor`. ok for now
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -848,22 +790,59 @@ public HoodieCleanMetadata clean(String cleanInstantTime,
boolean scheduleInline
}
}
- if (hasInflightClean || scheduledClean) {
+ if (inflightClean.isPresent() || cleanInstantTime.isPresent()) {
table.getMetaClient().reloadActiveTimeline();
// Proceeds to execute any requested or inflight clean instances in the
timeline
- HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
+ String cleanInstant = cleanInstantTime.isPresent() ?
cleanInstantTime.get() : inflightClean.get();
+ HoodieCleanMetadata metadata = table.clean(context, cleanInstant);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs,
metadata.getTotalFilesDeleted());
LOG.info("Cleaned {} files Earliest Retained Instant :{}
cleanerElapsedMs: {}",
metadata.getTotalFilesDeleted(),
metadata.getEarliestCommitToRetain(), durationMs);
}
- releaseResources(cleanInstantTime);
+ releaseResources(cleanInstant);
return metadata;
}
return null;
}
+ /**
+ * Computes a cleaner plan and persists it to the timeline if cleaning is
required.
+ *
+ * @param table table to schedule cleaning on.
+ * @param suppliedCleanInstant Optional supplied clean instant time that
overrides the generated time. This can only be used for testing.
+ * @return the requested instant time if the service was scheduled
+ */
+ private Option<String> scheduleCleaning(HoodieTable<?, ?, ?, ?> table,
Option<String> suppliedCleanInstant) {
Review Comment:
nts: who calls this and can we avoid this specical handling.. and just have
the cleaner retry normally..
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -666,71 +632,64 @@ protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata me
*
* @param extraMetadata Metadata to pass onto the scheduled service
instant
* @param tableServiceType Type of table service to schedule
- * @return
+ * @return the requested instant time if the service was scheduled
*/
- public Option<String> scheduleTableService(String instantTime,
Option<Map<String, String>> extraMetadata,
+ public Option<String> scheduleTableService(Option<Map<String, String>>
extraMetadata,
TableServiceType
tableServiceType) {
- // A lock is required to guard against race conditions between an ongoing
writer and scheduling a table service.
- HoodieTableConfig tableConfig =
HoodieTableConfig.loadFromHoodieProps(storage, config.getBasePath());
- InstantGenerator instantGenerator =
TimelineLayout.fromVersion(tableConfig.getTableVersion().getTimelineLayoutVersion()).getInstantGenerator();
- final Option<HoodieInstant> inflightInstant =
Option.of(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
- tableServiceType.getAction(), instantTime));
- try {
- this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service {} for table {}", tableServiceType,
config.getBasePath());
- return scheduleTableServiceInternal(instantTime, extraMetadata,
tableServiceType);
- } finally {
- this.txnManager.endTransaction(inflightInstant);
- }
+ return scheduleTableServiceInternal(Option.empty(), extraMetadata,
tableServiceType);
}
- protected Option<String> scheduleTableServiceInternal(String instantTime,
Option<Map<String, String>> extraMetadata,
- TableServiceType
tableServiceType) {
+ Option<String> scheduleTableServiceInternal(Option<String>
providedInstantTime, Option<Map<String, String>> extraMetadata,
+ TableServiceType
tableServiceType) {
if (!tableServicesEnabled(config)) {
return Option.empty();
}
+ if (tableServiceType == TableServiceType.ARCHIVE) {
+ LOG.info("Scheduling archiving is not supported. Skipping.");
+ return Option.empty();
+ }
+ if (tableServiceType == TableServiceType.CLEAN) {
+ // Cleaning acts on historical commits and is handled differently
+ return scheduleCleaning(createTable(config, storageConf),
providedInstantTime);
+ }
+ txnManager.beginTransaction(Option.empty(), Option.empty());
Review Comment:
I think we should rename beginTransaction/endTransction to
beginLock()/endLock() or lock()/unlock() ?
This naming is a relic from the time we had just the locking for writers not
T.S becuase all of them happened inline during one of the writer operations.
Can be in a separate PR. I can take care of this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -97,6 +95,7 @@
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
+import static
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.EMPTY_CLEANER_PLAN;
Review Comment:
is there a way we avoid this EMPTY_CLEANER_PLAN and achive the same using
Option.empty(). If not, happy to budge.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1633,14 +1634,10 @@ protected void cleanIfNecessary(BaseHoodieWriteClient
writeClient, String instan
// Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
- writeClient.clean(createCleanInstantTime(instantTime));
+ writeClient.clean();
Review Comment:
@nsivabalan to confirm if this is okay.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1015,17 +997,19 @@ String startCommit(Option<String> providedInstantTime,
String actionType, Hoodie
* @param extraMetadata Extra Metadata to be stored
*/
public Option<String> scheduleCompaction(Option<Map<String, String>>
extraMetadata) throws HoodieIOException {
- String instantTime = createNewInstantTime();
- return scheduleCompactionAtInstant(instantTime, extraMetadata) ?
Option.of(instantTime) : Option.empty();
+ return scheduleTableService(Option.empty(), extraMetadata,
TableServiceType.COMPACT);
}
/**
* Schedules a new compaction instant with passed-in instant time.
* @param instantTime Compaction Instant Time
* @param extraMetadata Extra Metadata to be stored
+ * @deprecated As of release 1.1.0, use {@link #scheduleCompaction(Option)}
instead.
+ * The instant time must be generated within the same transaction as the
plan for proper consistency guarantees.
Review Comment:
nit: same transaction -> same lock
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -848,22 +790,59 @@ public HoodieCleanMetadata clean(String cleanInstantTime,
boolean scheduleInline
}
}
- if (hasInflightClean || scheduledClean) {
+ if (inflightClean.isPresent() || cleanInstantTime.isPresent()) {
table.getMetaClient().reloadActiveTimeline();
// Proceeds to execute any requested or inflight clean instances in the
timeline
- HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
+ String cleanInstant = cleanInstantTime.isPresent() ?
cleanInstantTime.get() : inflightClean.get();
Review Comment:
different name, given it's the same string/time as cleanInstantTime. e.g
finalCleanInstantTime .
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -543,16 +543,14 @@ public abstract Option<HoodieClusteringPlan>
scheduleClustering(HoodieEngineCont
public abstract void rollbackBootstrap(HoodieEngineContext context, String
instantTime);
/**
- * Schedule cleaning for the instant time.
+ * Generates a cleaner plan if required.
*
- * @param context HoodieEngineContext
- * @param instantTime Instant Time for scheduling cleaning
+ * @param context HoodieEngineContext
* @param extraMetadata additional metadata to write into plan
* @return HoodieCleanerPlan, if there is anything to clean.
*/
- public abstract Option<HoodieCleanerPlan>
scheduleCleaning(HoodieEngineContext context,
- String
instantTime,
-
Option<Map<String, String>> extraMetadata);
+ public abstract Option<HoodieCleanerPlan>
createCleanerPlan(HoodieEngineContext context,
Review Comment:
this API is probably not consistent with rest of the methods here? for e.g
indexing/rollback/restore etc.. still have schedule methods?
Bigger point. I think we need to handle indexing, rollback and restores as
well in the same way? i.e they are table services? at-least indexing is def
something to be handled..
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -848,22 +790,59 @@ public HoodieCleanMetadata clean(String cleanInstantTime,
boolean scheduleInline
}
}
- if (hasInflightClean || scheduledClean) {
+ if (inflightClean.isPresent() || cleanInstantTime.isPresent()) {
table.getMetaClient().reloadActiveTimeline();
// Proceeds to execute any requested or inflight clean instances in the
timeline
- HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
+ String cleanInstant = cleanInstantTime.isPresent() ?
cleanInstantTime.get() : inflightClean.get();
+ HoodieCleanMetadata metadata = table.clean(context, cleanInstant);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs,
metadata.getTotalFilesDeleted());
LOG.info("Cleaned {} files Earliest Retained Instant :{}
cleanerElapsedMs: {}",
metadata.getTotalFilesDeleted(),
metadata.getEarliestCommitToRetain(), durationMs);
}
- releaseResources(cleanInstantTime);
+ releaseResources(cleanInstant);
return metadata;
}
return null;
}
+ /**
+ * Computes a cleaner plan and persists it to the timeline if cleaning is
required.
+ *
+ * @param table table to schedule cleaning on.
+ * @param suppliedCleanInstant Optional supplied clean instant time that
overrides the generated time. This can only be used for testing.
+ * @return the requested instant time if the service was scheduled
+ */
+ private Option<String> scheduleCleaning(HoodieTable<?, ?, ?, ?> table,
Option<String> suppliedCleanInstant) {
+ Option<HoodieCleanerPlan> cleanerPlan = table.createCleanerPlan(context,
Option.empty());
+ if (cleanerPlan.isPresent()) {
+ // handle special case where planner returns empty plan due to
corruption of existing instant
Review Comment:
I am curious what this is, if its a bandaid.. and if we still need it. but
ok to leave it alone in this PR.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -666,71 +632,64 @@ protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata me
*
* @param extraMetadata Metadata to pass onto the scheduled service
instant
* @param tableServiceType Type of table service to schedule
- * @return
+ * @return the requested instant time if the service was scheduled
*/
- public Option<String> scheduleTableService(String instantTime,
Option<Map<String, String>> extraMetadata,
+ public Option<String> scheduleTableService(Option<Map<String, String>>
extraMetadata,
TableServiceType
tableServiceType) {
- // A lock is required to guard against race conditions between an ongoing
writer and scheduling a table service.
- HoodieTableConfig tableConfig =
HoodieTableConfig.loadFromHoodieProps(storage, config.getBasePath());
- InstantGenerator instantGenerator =
TimelineLayout.fromVersion(tableConfig.getTableVersion().getTimelineLayoutVersion()).getInstantGenerator();
- final Option<HoodieInstant> inflightInstant =
Option.of(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
- tableServiceType.getAction(), instantTime));
- try {
- this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service {} for table {}", tableServiceType,
config.getBasePath());
- return scheduleTableServiceInternal(instantTime, extraMetadata,
tableServiceType);
- } finally {
- this.txnManager.endTransaction(inflightInstant);
- }
+ return scheduleTableServiceInternal(Option.empty(), extraMetadata,
tableServiceType);
}
- protected Option<String> scheduleTableServiceInternal(String instantTime,
Option<Map<String, String>> extraMetadata,
- TableServiceType
tableServiceType) {
+ Option<String> scheduleTableServiceInternal(Option<String>
providedInstantTime, Option<Map<String, String>> extraMetadata,
+ TableServiceType
tableServiceType) {
if (!tableServicesEnabled(config)) {
return Option.empty();
}
+ if (tableServiceType == TableServiceType.ARCHIVE) {
+ LOG.info("Scheduling archiving is not supported. Skipping.");
+ return Option.empty();
+ }
+ if (tableServiceType == TableServiceType.CLEAN) {
+ // Cleaning acts on historical commits and is handled differently
+ return scheduleCleaning(createTable(config, storageConf),
providedInstantTime);
+ }
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ try {
+ Option<String> option;
+ HoodieTable<?, ?, ?, ?> table = createTable(config, storageConf);
+ String instantTime = providedInstantTime.orElseGet(() ->
createNewInstantTime(false));
- Option<String> option = Option.empty();
- HoodieTable<?, ?, ?, ?> table = createTable(config, storageConf);
+ switch (tableServiceType) {
+ case CLUSTER:
+ LOG.info("Scheduling clustering at instant time: {} for table {}",
instantTime, config.getBasePath());
+ Option<HoodieClusteringPlan> clusteringPlan = table
+ .scheduleClustering(context, instantTime, extraMetadata);
+ option = clusteringPlan.isPresent() ? Option.of(instantTime) :
Option.empty();
Review Comment:
does `option = clusteringPlan.map(plan -> instantTime);` achieve the same?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -666,71 +632,64 @@ protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata me
*
* @param extraMetadata Metadata to pass onto the scheduled service
instant
* @param tableServiceType Type of table service to schedule
- * @return
+ * @return the requested instant time if the service was scheduled
*/
- public Option<String> scheduleTableService(String instantTime,
Option<Map<String, String>> extraMetadata,
+ public Option<String> scheduleTableService(Option<Map<String, String>>
extraMetadata,
TableServiceType
tableServiceType) {
- // A lock is required to guard against race conditions between an ongoing
writer and scheduling a table service.
- HoodieTableConfig tableConfig =
HoodieTableConfig.loadFromHoodieProps(storage, config.getBasePath());
- InstantGenerator instantGenerator =
TimelineLayout.fromVersion(tableConfig.getTableVersion().getTimelineLayoutVersion()).getInstantGenerator();
- final Option<HoodieInstant> inflightInstant =
Option.of(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
- tableServiceType.getAction(), instantTime));
- try {
- this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service {} for table {}", tableServiceType,
config.getBasePath());
- return scheduleTableServiceInternal(instantTime, extraMetadata,
tableServiceType);
- } finally {
- this.txnManager.endTransaction(inflightInstant);
- }
+ return scheduleTableServiceInternal(Option.empty(), extraMetadata,
tableServiceType);
}
- protected Option<String> scheduleTableServiceInternal(String instantTime,
Option<Map<String, String>> extraMetadata,
- TableServiceType
tableServiceType) {
+ Option<String> scheduleTableServiceInternal(Option<String>
providedInstantTime, Option<Map<String, String>> extraMetadata,
+ TableServiceType
tableServiceType) {
if (!tableServicesEnabled(config)) {
return Option.empty();
}
+ if (tableServiceType == TableServiceType.ARCHIVE) {
+ LOG.info("Scheduling archiving is not supported. Skipping.");
Review Comment:
change to: "Archival does not need scheduling. Skipping."
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1583,6 +1583,7 @@ static HoodieActiveTimeline
runPendingTableServicesOperationsAndRefreshTimeline(
* deltacommit.
*/
void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
+ // TODO how to handle this case where compaction needs to be written in
the past
Review Comment:
can you elaborate this scenario?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java:
##########
@@ -51,7 +51,7 @@ protected Pair<CompletableFuture, ExecutorService>
startService() {
String instantTime = writeClient.createNewInstantTime();
LOG.info("Starting async clean service with instant time {}.",
instantTime);
Review Comment:
get rid of the time generation and the log message?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -868,50 +868,32 @@ private Pair<String, Option<HoodieRestorePlan>>
scheduleAndGetRestorePlan(final
* configurations and CleaningPolicy used. (typically files that no longer
can be used by a running query can be
* cleaned)
*/
+ @Deprecated
public HoodieCleanMetadata clean(String cleanInstantTime) throws
HoodieIOException {
- return clean(cleanInstantTime, true, false);
+ return tableServiceClient.clean(Option.of(cleanInstantTime), true);
}
/**
* Clean up any stale/old files/data lying around (either on file storage or
index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer
can be used by a running query can be
* cleaned)
- * @param cleanInstantTime instant time for clean.
- * @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
- * @return instance of {@link HoodieCleanMetadata}.
+ *
+ * @param scheduleInline true if needs to be scheduled inline. false
otherwise.
+ * @return the metadata generated by the clean operation
*/
- @Deprecated
- public HoodieCleanMetadata clean(String cleanInstantTime, boolean
skipLocking) throws HoodieIOException {
- return clean(cleanInstantTime, true, false);
+ public HoodieCleanMetadata clean(boolean scheduleInline) {
Review Comment:
nts: the method now controls where its inline or not.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java:
##########
@@ -61,13 +61,12 @@ public void open(Configuration parameters) throws Exception
{
super.open(parameters);
this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
this.executor =
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
- String instantTime = writeClient.createNewInstantTime();
- LOG.info(String.format("exec clean with instant time %s...", instantTime));
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
executor.execute(() -> {
this.isCleaning = true;
try {
- this.writeClient.clean(instantTime);
+ LOG.info("Starting clean");
Review Comment:
nit: do we need this given don't know the instant time.. instead ensure the
writeClient.clean() logs this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]