alexeykudinkin commented on code in PR #5269:
URL: https://github.com/apache/hudi/pull/5269#discussion_r998645726


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java:
##########
@@ -20,24 +20,37 @@
 package org.apache.hudi.async;
 
 import org.apache.hudi.client.RunsTableService;
+import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 public abstract class HoodieAsyncTableService extends HoodieAsyncService 
implements RunsTableService {
 
+  protected final Object writeConfigUpdateLock = new Object();
   protected HoodieWriteConfig writeConfig;
+  protected Option<EmbeddedTimelineService> embeddedTimelineService;
+  protected AtomicBoolean isWriteConfigUpdated = new AtomicBoolean(false);
 
   protected HoodieAsyncTableService() {
   }
 
-  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig) {
+  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, 
Option<EmbeddedTimelineService> embeddedTimelineService) {
     this.writeConfig = writeConfig;
+    this.embeddedTimelineService = embeddedTimelineService;

Review Comment:
   Let's chain the ctors (so that we don't need to duplicate the init logic



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java:
##########
@@ -47,4 +60,11 @@ public void start(Function<Boolean, Boolean> 
onShutdownCallback) {
     }
     super.start(onShutdownCallback);
   }
+
+  public void updateWriteConfig(HoodieWriteConfig writeConfig) {
+    synchronized (writeConfigUpdateLock) {
+      this.writeConfig = 
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
 writeConfig);
+      isWriteConfigUpdated.set(true);

Review Comment:
   Flag setup is quite brittle: it makes resetting a 2-step process (first 
update the flag, then update needs to be handled asynchronously), which is easy 
to miss during impl.
   
   Instead i'd suggest we 
    - Create abstract hook `handleWriteConfigUpdate` (hence make every 
inheritor implement it)
    - Invoke it here under lock (that way we keep the lock private and avoid 
this logic spilling into inheritors)
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java:
##########
@@ -20,24 +20,37 @@
 package org.apache.hudi.async;
 
 import org.apache.hudi.client.RunsTableService;
+import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 public abstract class HoodieAsyncTableService extends HoodieAsyncService 
implements RunsTableService {
 
+  protected final Object writeConfigUpdateLock = new Object();
   protected HoodieWriteConfig writeConfig;
+  protected Option<EmbeddedTimelineService> embeddedTimelineService;
+  protected AtomicBoolean isWriteConfigUpdated = new AtomicBoolean(false);
 
   protected HoodieAsyncTableService() {
   }
 
-  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig) {
+  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, 
Option<EmbeddedTimelineService> embeddedTimelineService) {
     this.writeConfig = writeConfig;
+    this.embeddedTimelineService = embeddedTimelineService;
   }
 
-  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, boolean 
runInDaemonMode) {
+  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, 
Option<EmbeddedTimelineService> embeddedTimelineService, boolean 
runInDaemonMode) {
     super(runInDaemonMode);
-    this.writeConfig = writeConfig;
+    this.embeddedTimelineService = embeddedTimelineService;
+    if (embeddedTimelineService.isPresent()) {

Review Comment:
   Why the same logic isn't applied in the other ctor?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java:
##########
@@ -78,13 +81,26 @@ protected Pair<CompletableFuture, ExecutorService> 
startService() {
 
         while (!isShutdownRequested()) {
           final HoodieInstant instant = fetchNextAsyncServiceInstant();
-
           if (null != instant) {
             LOG.info("Starting Compaction for instant " + instant);
+            synchronized (writeConfigUpdateLock) {

Review Comment:
   Please check my comment below regarding the flag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to