vinothchandar commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r719356462
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime,
HoodieCommitMetadata metadata, String
}
}
+ /**
+ * Any pre-commit actions like conflict resolution or updating metadata
table goes here.
+ * @param instantTime commit instant time.
+ * @param metadata commit metadata for which pre commit is being invoked.
+ */
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
- // no-op
- // TODO : Conflict resolution is not supported for Flink & Java engines
- }
-
- protected void syncTableMetadata() {
- // no-op
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
+ // Important to create this after the lock to ensure latest commits show
up in the timeline without need for reload
Review comment:
It'spretty hard to reason about these locking issues while insidethe
method. Can we please move Comments to the caller
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime,
HoodieCommitMetadata metadata, String
}
}
+ /**
+ * Any pre-commit actions like conflict resolution or updating metadata
table goes here.
+ * @param instantTime commit instant time.
+ * @param metadata commit metadata for which pre commit is being invoked.
+ */
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
- // no-op
- // TODO : Conflict resolution is not supported for Flink & Java engines
- }
-
- protected void syncTableMetadata() {
- // no-op
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
Review comment:
Whats 'startTxn' here? Can we just use plain speak?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -280,23 +272,23 @@ protected void bootstrapIfNeeded(HoodieEngineContext
engineContext, HoodieTableM
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
*/
- private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext,
HoodieTableMetaClient datasetMetaClient) throws IOException {
+ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient) throws IOException {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
// We can only bootstrap if there are no pending operations on the dataset
- Option<HoodieInstant> pendingInstantOption =
Option.fromJavaOptional(datasetMetaClient.getActiveTimeline()
+ Option<HoodieInstant> pendingDataInstantOption =
Option.fromJavaOptional(dataMetaClient.getActiveTimeline()
Review comment:
We can drop " Option". It's kind of redundant
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext
engineContext, HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
- JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+ JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
- writeClient.startCommitWithTime(instantTime);
+ if
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ writeClient.startCommitWithTime(instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that got committed
to metadata, but failed in dataset.
+ // for eg, lets say compaction c1 on 1st attempt succeeded in metadata
table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will
be created.
+ // once rollback is complete, compaction will be retried again, which
will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually remove
the completed instant and proceed.
+ // and it is for the same reason we enabled
withAllowMultiWriteOnSameInstant for metadata table.
+ HoodieInstant alreadyCompletedInstant =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry
-> entry.getTimestamp().equals(instantTime)).lastInstant().get();
+ FSUtils.deleteInstantFile(metadataMetaClient.getFs(),
metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
Review comment:
`deleteInstantFile` does not belong in FSUtils. FSUtils should be
limited to just things done using a filesystem instance. can you move this to a
method on the timeline. there are already methods like those in
`HoodieActiveTimeline`
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -85,4 +96,30 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
Review comment:
Can we simplify?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -85,4 +96,30 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
Review comment:
why do we need this, if you are going to synchronize anyway?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime,
HoodieCommitMetadata metadata, String
}
}
+ /**
+ * Any pre-commit actions like conflict resolution or updating metadata
table goes here.
+ * @param instantTime commit instant time.
+ * @param metadata commit metadata for which pre commit is being invoked.
+ */
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
- // no-op
- // TODO : Conflict resolution is not supported for Flink & Java engines
- }
-
- protected void syncTableMetadata() {
- // no-op
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
+ // Important to create this after the lock to ensure latest commits show
up in the timeline without need for reload
Review comment:
Don't we already create a table instance from the caller? Can we not
reuse it. Are you saying you need to see even the in-flight instants in this
commit
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,82 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize file groups for a partition. For file listing, we just have
one file group.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * All FileGroups for a given metadata partition has a fixed prefix as per
the {@link MetadataPartitionType#getFileIdPrefix()}.
+ * Each file group is suffixed with 4 digits with increments of 1 starting
with 0.
+ *
+ * For instance, for FILES, there is only one file group named as
"files-0000"
Review comment:
Let's trim this comment'?
##########
File path:
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
##########
@@ -78,6 +81,13 @@ public boolean tryLock(long time, TimeUnit unit) {
&& (numRetries <=
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) {
Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY));
}
+ // if two processes tries to acquire lock at the same time, above while
loop will be bypassed and both will proceed on to acquire lock.
+ // locally, fs.create(lockFile) succeeds for both even though only one
should succeed.
+ // hence adding this random sleep to guard against two processes
acquiring same lock by mistake.
+ Thread.sleep(RANDOM.nextInt(10));
Review comment:
this will lead to flaky/non-determinsitc tests? lets fix it by other
means?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]