codope commented on a change in pull request #4420:
URL: https://github.com/apache/hudi/pull/4420#discussion_r785765597
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -464,27 +464,43 @@ protected void postCommit(HoodieTable<T, I, K, O> table,
HoodieCommitMetadata me
}
protected void runTableServicesInline(HoodieTable<T, I, K, O> table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
- if (config.areAnyTableServicesInline()) {
+ if (config.areAnyTableServicesInline() ||
config.scheduleInlineTableServices()) {
if (config.isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
// Do an inline compaction if enabled
if (config.inlineCompactionEnabled()) {
runAnyPendingCompactions(table);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
- inlineCompact(extraMetadata);
+ inlineScheduleCompactAndOptionallyExecute(extraMetadata,
!config.scheduleInlineCompaction());
Review comment:
Let's say INLINE_COMPACT is true and SCHEDULE_INLINE_COMPACT is also
true. Then `inlineScheduleCompactAndOptionallyExecute` will schedule compaction
but never execute right? This sounds counter-intuitive from user point of view.
Don't you think?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -177,6 +177,12 @@
.withDocumentation("Determines how to handle updates, deletes to file
groups that are under clustering."
+ " Default strategy just rejects the update");
+ public static final ConfigProperty<String> SCHEDULE_INLINE_CLUSTERING =
ConfigProperty
+ .key("hoodie.clustering.schedule.inline")
+ .defaultValue("false")
+ .withDocumentation("When set to true, clustering service will be
attempted for inline scheduling after each write. Users have to ensure "
Review comment:
I think we should add more details regarding how it interacts with (or
differs from) other inline/async configs otherwise it might confuse users why
we have separate configs. First, we could mention what will happen if this
config is enabled together with the existing inline/async compaction/clustering
configs. Secondly, we can use the corresponding config keys in the doc to be
more explicit.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -464,27 +464,43 @@ protected void postCommit(HoodieTable<T, I, K, O> table,
HoodieCommitMetadata me
}
protected void runTableServicesInline(HoodieTable<T, I, K, O> table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
- if (config.areAnyTableServicesInline()) {
+ if (config.areAnyTableServicesInline() ||
config.scheduleInlineTableServices()) {
if (config.isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
// Do an inline compaction if enabled
if (config.inlineCompactionEnabled()) {
runAnyPendingCompactions(table);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
Review comment:
Should we move this as well inside
`inlineScheduleCompactAndOptionallyExecute` and set only when execution happens?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
##########
@@ -133,6 +134,56 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat
fileFormat, boolean popul
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testInlineScheduleCompaction(boolean scheduleInlineCompaction)
throws Exception {
+ HoodieFileFormat fileFormat = HoodieFileFormat.PARQUET;
+ Properties properties = new Properties();
+ properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(),
fileFormat.toString());
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
+
+ HoodieWriteConfig cfg = getConfigBuilder(false)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024 * 1024)
+
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ /*
+ * Write 1 (only inserts)
+ */
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient,
records, client, cfg, newCommitTime, true);
+ assertTrue(dataFiles.findAny().isPresent(), "should list the base files
we wrote in the delta commit");
+
+ /*
+ * Write 2 (updates)
+ */
+ newCommitTime = "004";
+ client.startCommitWithTime(newCommitTime);
+ records = dataGen.generateUpdates(newCommitTime, 100);
+ updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime,
true);
+
+ // validate compaction has been scheduled inline
+ /*HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
+ hoodieTable.getHoodieView().sync();
+ FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
+ HoodieTableFileSystemView tableView =
getHoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
+ assertTrue(dataFilesToRead.findAny().isPresent());*/
Review comment:
Let's remove this if not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]