vinothchandar commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r719356462



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
     }
   }
 
+  /**
+   * Any pre-commit actions like conflict resolution or updating metadata 
table goes here.
+   * @param instantTime commit instant time.
+   * @param metadata commit metadata for which pre commit is being invoked.
+   */
   protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
-    // no-op
-    // TODO : Conflict resolution is not supported for Flink & Java engines
-  }
-
-  protected void syncTableMetadata() {
-    // no-op
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload

Review comment:
       It'spretty hard to reason about these locking issues while insidethe 
method. Can we please move Comments to the caller
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
     }
   }
 
+  /**
+   * Any pre-commit actions like conflict resolution or updating metadata 
table goes here.
+   * @param instantTime commit instant time.
+   * @param metadata commit metadata for which pre commit is being invoked.
+   */
   protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
-    // no-op
-    // TODO : Conflict resolution is not supported for Flink & Java engines
-  }
-
-  protected void syncTableMetadata() {
-    // no-op
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.

Review comment:
       Whats 'startTxn' here? Can we just use plain speak? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -280,23 +272,23 @@ protected void bootstrapIfNeeded(HoodieEngineContext 
engineContext, HoodieTableM
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
    */
-  private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) throws IOException {
+  private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, 
HoodieTableMetaClient dataMetaClient) throws IOException {
     ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
 
     // We can only bootstrap if there are no pending operations on the dataset
-    Option<HoodieInstant> pendingInstantOption = 
Option.fromJavaOptional(datasetMetaClient.getActiveTimeline()
+    Option<HoodieInstant> pendingDataInstantOption = 
Option.fromJavaOptional(dataMetaClient.getActiveTimeline()

Review comment:
       We can drop " Option". It's kind of redundant

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext 
engineContext, HoodieTableMetaClie
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
-    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
 
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
-      writeClient.startCommitWithTime(instantTime);
+      if 
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        writeClient.startCommitWithTime(instantTime);
+      } else {
+        // this code path refers to a re-attempted commit that got committed 
to metadata, but failed in dataset.
+        // for eg, lets say compaction c1 on 1st attempt succeeded in metadata 
table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete, compaction will be retried again, which 
will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
+        // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
+        HoodieInstant alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime)).lastInstant().get();
+        FSUtils.deleteInstantFile(metadataMetaClient.getFs(), 
metadataMetaClient.getMetaPath(), alreadyCompletedInstant);

Review comment:
       `deleteInstantFile` does not belong in FSUtils. FSUtils should be 
limited to just things done using a filesystem instance. can you move this to a 
method on the timeline. there are already methods like those in 
`HoodieActiveTimeline` 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -85,4 +96,30 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {

Review comment:
       Can we simplify?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -85,4 +96,30 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {

Review comment:
       why do we need this, if you are going to synchronize anyway?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
     }
   }
 
+  /**
+   * Any pre-commit actions like conflict resolution or updating metadata 
table goes here.
+   * @param instantTime commit instant time.
+   * @param metadata commit metadata for which pre commit is being invoked.
+   */
   protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
-    // no-op
-    // TODO : Conflict resolution is not supported for Flink & Java engines
-  }
-
-  protected void syncTableMetadata() {
-    // no-op
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload

Review comment:
       Don't we already create a table instance from the caller? Can we not 
reuse it. Are you saying you need to see even the in-flight instants in this 
commit

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,82 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize file groups for a partition. For file listing, we just have 
one file group.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * All FileGroups for a given metadata partition has a fixed prefix as per 
the {@link MetadataPartitionType#getFileIdPrefix()}.
+   * Each file group is suffixed with 4 digits with increments of 1 starting 
with 0.
+   *
+   * For instance, for FILES, there is only one file group named as 
"files-0000"

Review comment:
       Let's trim this comment'?

##########
File path: 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
##########
@@ -78,6 +81,13 @@ public boolean tryLock(long time, TimeUnit unit) {
           && (numRetries <= 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) {
         
Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY));
       }
+      // if two processes tries to acquire lock at the same time, above while 
loop will be bypassed and both will proceed on to acquire lock.
+      // locally, fs.create(lockFile) succeeds for both even though only one 
should succeed.
+      // hence adding this random sleep to guard against two processes 
acquiring same lock by mistake.
+      Thread.sleep(RANDOM.nextInt(10));

Review comment:
       this will lead to flaky/non-determinsitc tests? lets fix it by other 
means? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to