vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r568841211



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -188,6 +203,8 @@ public boolean commitStats(String instantTime, 
List<HoodieWriteStat> stats, Opti
       postCommit(table, metadata, instantTime, extraMetadata);
       emitCommitMetrics(instantTime, metadata, commitActionType);
       LOG.info("Committed " + instantTime);
+      // Reset the last completed write instant
+      latestCompletedWriteInstant.set(null);

Review comment:
       how about empty instead of null. In general, like to avoid using `null` 
for any kind of sentinel

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  NO_WRITER("no_writer"),

Review comment:
       whats `NO_WRITER` ? its kind of difficult to understand . can we remove 
this

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  NO_WRITER("no_writer"),
+  // Only a single writer can perform write ops
+  SINGLE_WRITER("single_writer"),
+  // Multiple writer can perform write ops with lazy conflict resolution using 
locks
+  
OPTIMISTIC_CONCURRENCY_CONTROL_SHARED_LOCK("optimistic_concurrency_control_shared_lock");
+
+  private final String value;
+
+  WriteConcurrencyMode(String value) {
+    this.value = value;
+  }
+
+  /**
+   * Getter for write concurrency mode.
+   * @return
+   */
+  public String value() {
+    return value;
+  }
+
+  /**
+   * Convert string value to WriteConcurrencyMode.
+   */
+  public static WriteConcurrencyMode fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "no_writer":
+        return NO_WRITER;
+      case "single_writer":
+        return SINGLE_WRITER;
+      case "optimistic_concurrency_control_shared_lock":
+        return OPTIMISTIC_CONCURRENCY_CONTROL_SHARED_LOCK;
+      default:
+        throw new HoodieException("Invalid value of Type.");
+    }
+  }
+
+  public boolean isMultiWriter() {

Review comment:
       rename to : supportsOptimisticConcurrencyControl() to match mode name. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -599,6 +637,7 @@ public HoodieRestoreMetadata restoreToInstant(final String 
instantTime) throws H
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
     LOG.info("Cleaner started");
     final Timer.Context timerContext = metrics.getCleanCtx();
+    scheduleCleaningAtInstant(cleanInstantTime, Option.empty());

Review comment:
       why was this change needed

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -239,6 +272,9 @@ public void bootstrap(Option<Map<String, String>> 
extraMetadata) {
     if (rollbackPending) {
       rollBackInflightBootstrap();
     }
+    if (config.getWriteConcurrencyMode().isMultiWriter()) {

Review comment:
       Do we need this check with OCC mode? in any case, we should ensure the 
bootstrap code downgrades to single writer, so users don't have to worry about 
this. Most people do bootstrap then followed by writing anyway. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -859,6 +973,68 @@ protected void finalizeWrite(HoodieTable<T, I, K, O> 
table, String instantTime,
     }
   }
 
+  private HoodieCommitMetadata resolveWriteConflictIfAny(HoodieTable<T, I, K, 
O> table, HoodieBackedTableMetadataWriter metadataWriter,
+                                                         final String 
instantTime, HoodieCommitMetadata thisCommitMetadata)
+      throws HoodieWriteConflictException {
+    ConflictResolutionStrategy resolutionStrategy = 
config.getWriteConflictResolutionStrategy();
+    Option<HoodieInstant> lastCompletedInstantBeforeWriterStarted = 
this.latestCompletedWriteInstant.get();
+    String lastInstantTimestamp = 
lastCompletedInstantBeforeWriterStarted.isPresent()
+        ? lastCompletedInstantBeforeWriterStarted.get().getTimestamp() : "0";
+    // Get completed instants timeline
+    Stream<HoodieInstant> completedInstantStream = table.getActiveTimeline()
+        .getAllCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .getCommitsAndCompactionTimeline()
+        .filterCompletedInstants()
+        .findInstantsAfter(lastInstantTimestamp)
+        .getInstants();
+
+    // Get pending replace and compaction instants timeline
+    Stream<HoodieInstant> pendingReplaceAndRequestedInstantStream = 
table.getActiveTimeline()
+        .getAllCommitsTimeline()
+        .filterPendingCompactionAndReplaceTimeline()
+        .findInstantsAfter(lastInstantTimestamp)
+        .getInstants();
+
+    Stream<HoodieInstant> instantStream = 
Stream.concat(completedInstantStream, pendingReplaceAndRequestedInstantStream);
+    final HoodieCommitOperation thisOperation = new 
HoodieCommitOperation(thisCommitMetadata, instantTime);
+    instantStream.forEach(instant -> {
+      try {
+        HoodieCommitOperation otherOperation = new 
HoodieCommitOperation(HoodieCommitMetadata.fromBytes(
+            table.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class), instant.getTimestamp());
+        if (resolutionStrategy.hasConflict(thisOperation, otherOperation)) {
+          LOG.info("Conflict encountered between instant = " + 
thisOperation.getInstant() + " and instant = "
+              + otherOperation.getInstant() + ", attempting to resolve it 
now");
+          resolutionStrategy.resolveConflict(metadataWriter, table, 
thisOperation, otherOperation);
+        }
+      } catch (IOException io) {
+        throw new HoodieWriteConflictException("Unable to resolve conflict, if 
present", io);
+      }
+    });
+    return thisOperation.getCommitMetadata();
+  }
+
+  private boolean executeCriticalSection(String instantTime, 
List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,

Review comment:
       same with this. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -230,13 +235,18 @@ protected void initTableMetadata() {
     }
   }
 
-  protected void bootstrapIfNeeded(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) throws IOException {
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    boolean exists = datasetMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      bootstrapFromFilesystem(engineContext, datasetMetaClient);
-      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+  protected void bootstrapIfNeeded(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) {

Review comment:
       we can also rename this method if needed

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -403,30 +439,32 @@ protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata me
       // Delete the marker directory for the instant.
       new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
 
-      // Do an inline compaction if enabled
-      if (config.isInlineCompaction()) {
-        runAnyPendingCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
-        inlineCompact(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
-      }
+      if (config.isInlineTableServiceEnabled()) {

Review comment:
       We should also protect the scheduling, correct? (may be it comes down 
the line)

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitMetadataUtils.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to manipulate commit metadata.
+ */
+public class CommitMetadataUtils {

Review comment:
       lets just add this to an existing class like `CommitUtils` 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -220,7 +253,7 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
     }
   }
 
-  protected void syncTableMetadata() {
+  protected void syncTableMetadata(boolean bootstrapIfNeeded) {

Review comment:
       lets call this  `initIfNeeded`. to avoid overloading `bootstrap` which 
has its own meaning. . 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/TableService.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Supported runtime table services.
+ */
+public enum TableService {
+  COMPACT("compact"),

Review comment:
       should archival be here too?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -403,30 +439,32 @@ protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata me
       // Delete the marker directory for the instant.
       new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
 
-      // Do an inline compaction if enabled
-      if (config.isInlineCompaction()) {
-        runAnyPendingCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
-        inlineCompact(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
-      }
+      if (config.isInlineTableServiceEnabled()) {
+        // Do an inline compaction if enabled
+        if (config.isInlineCompaction()) {
+          runAnyPendingCompactions(table);
+          metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
+          inlineCompact(extraMetadata);
+        } else {
+          metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
+        }
 
-      // Do an inline clustering if enabled
-      if (config.isInlineClustering()) {
-        runAnyPendingClustering(table);
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, 
"true");
-        inlineCluster(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, 
"false");
-      }
+        // Do an inline clustering if enabled

Review comment:
       just double check if the indents are okay here?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/TableService.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Supported runtime table services.
+ */
+public enum TableService {
+  COMPACT("compact"),
+  CLUSTER("cluster"),
+  CLEAN("clean");
+  private final String value;

Review comment:
       rename: `Sting name`. I actually don't see the need for this field atm 
now. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -403,30 +439,32 @@ protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata me
       // Delete the marker directory for the instant.
       new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
 
-      // Do an inline compaction if enabled
-      if (config.isInlineCompaction()) {
-        runAnyPendingCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
-        inlineCompact(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
-      }
+      if (config.isInlineTableServiceEnabled()) {

Review comment:
       so, this block only enables the inline execution of the table services? 
or does it also cover the scheduling of cleaning, compaction, clustering etc?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -41,7 +41,8 @@
   public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
   public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
   public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
-
+  // Turn on inline cleaning
+  public static final String INLINE_CLEAN_PROP = "hoodie.clean.inline";

Review comment:
       we should check somewhere that user cannot turn on both async and 
inline? , if we are adding an explicit config. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -98,6 +112,7 @@
   private transient HoodieWriteCommitCallback commitCallback;
   private transient AsyncCleanerService asyncCleanerService;
   protected final boolean rollbackPending;
+  protected AtomicReference<Option<HoodieInstant>> latestCompletedWriteInstant 
= new AtomicReference<>();

Review comment:
       we should clear annotate/document that this we do not support multiple 
writers on the same writeClient instance. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -907,11 +1083,20 @@ public void close() {
     // release AsyncCleanerService
     AsyncCleanerService.forceShutdown(asyncCleanerService);
     asyncCleanerService = null;
-
     // Stop timeline-server if running
     super.close();
     // Calling this here releases any resources used by your index, so make 
sure to finish any related operations
     // before this point
     this.index.close();
+    // HiveMetastoreClient does not implement AutoCloseable. Additionally, we 
cannot call close() after unlock()

Review comment:
       please move these comments into the actual implementation. and not in 
this file

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -389,6 +397,10 @@ public boolean isAsyncClean() {
     return 
Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.ASYNC_CLEAN_PROP));
   }
 
+  public boolean isInlineCleaning() {

Review comment:
       this does not read easily. rename `inlineCleaningEnabled` or 
`shouldCleanInline()` or something like tht? (same wherever applicable)

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -859,6 +973,68 @@ protected void finalizeWrite(HoodieTable<T, I, K, O> 
table, String instantTime,
     }
   }
 
+  private HoodieCommitMetadata resolveWriteConflictIfAny(HoodieTable<T, I, K, 
O> table, HoodieBackedTableMetadataWriter metadataWriter,

Review comment:
       can this code sit somewhere else?  we should try to keep the write 
client file pretty lean and do only the control flow

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -923,6 +935,39 @@ public int getMetadataCleanerCommitsRetained() {
     return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP));
   }
 
+  /**
+   * Hoodie Client Lock Configs.
+   * @return
+   */
+
+  public String getLockProviderClass() {
+    return props.getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
+  }
+
+  public String getLockHiveDatabaseName() {
+    return props.getProperty(HIVE_DATABASE_NAME_PROP);
+  }
+
+  public String getLockHiveTableName() {
+    return props.getProperty(HIVE_TABLE_NAME_PROP);
+  }
+
+  public ConflictResolutionStrategy getWriteConflictResolutionStrategy() {
+    return 
ReflectionUtils.loadClass(props.getProperty(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP));
+  }
+
+  public Long getLockAcquireWaitTimeoutInMs() {
+    return 
Long.valueOf(props.getProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP));
+  }
+
+  public WriteConcurrencyMode getWriteConcurrencyMode() {
+    return 
WriteConcurrencyMode.fromValue(props.getProperty(WRITE_CONCURRENCY_MODE_PROP));
+  }
+
+  public Boolean isInlineTableServiceEnabled() {

Review comment:
       same. rename `inlineTableServices()`? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -88,7 +88,7 @@
   protected SerializableConfiguration hadoopConf;
   protected final transient HoodieEngineContext engineContext;
 
-  protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+  protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, boolean 
bootstrapIfNeeded) {

Review comment:
       rename to `initIfNeeded` consistently




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to