vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r587049910



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -30,16 +31,19 @@
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;

Review comment:
       `lock` as a package name feels off to me. Can we have 
`org.apache.hudi.client.transaction.TransactionManager`? 
   Then `.lock` can be a sub package under i.e `.transaction.lock.` 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -30,16 +31,19 @@
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableService;

Review comment:
       So, `model` package should just contain pojos i.e data structure 
objects. Lets move `TableService` elsewhere

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -210,6 +231,11 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
     }
   }
 
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // no-op
+    // TODO : Conflict resolution is not support for Flink,Java engines

Review comment:
       typo: not supported

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -797,7 +853,9 @@ public Boolean rollbackFailedWrites() {
    * Performs a compaction operation on a table, serially before or after an 
insert/upsert action.
    */
   protected Option<String> inlineCompact(Option<Map<String, String>> 
extraMetadata) {
-    Option<String> compactionInstantTimeOpt = 
scheduleCompaction(extraMetadata);
+    String schedulingCompactionInstant = 
HoodieActiveTimeline.createNewInstantTime();

Review comment:
       rename: `compactionInstantTime` 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of 
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+  /**
+   * Stream of instants to check conflicts against.
+   * @return
+   */
+  Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+
+  /**
+   * Implementations of this method will determine whether a conflict exists 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation);

Review comment:
       this reads nicely :)

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;

Review comment:
       how are you testing this? I don't see curator added to any of the 
bundles? Same for the zookeeper dependencies. This is a really really important 
part

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+  private static final Logger LOG = 
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+  private CuratorFramework curatorFrameworkClient;
+  private volatile InterProcessMutex lock = null;
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final Configuration conf) {
+    this(lockConfiguration);
+    this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+        
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+        .retryPolicy(new 
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+            5000, 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+        
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_SESSION_TIMEOUT_MS))
+        
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+        .build();
+    this.curatorFrameworkClient.start();
+  }
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final CuratorFramework curatorFrameworkClient) {
+    this(lockConfiguration);
+    this.curatorFrameworkClient = curatorFrameworkClient;
+    synchronized (this.curatorFrameworkClient) {
+      if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
+        this.curatorFrameworkClient.start();
+      }
+    }
+  }
+
+  ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+    checkRequiredProps(lockConfiguration);
+    this.lockConfiguration = lockConfiguration;
+  }
+  
+  public void acquireLock(long time, TimeUnit unit) throws Exception {

Review comment:
       why public

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I 
preppedRecords, final String instan
    * Common method containing steps to be performed before write 
(upsert/insert/...
    * @param instantTime
    * @param writeOperationType
+   * @param metaClient
    */
-  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
+  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+      HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
-    syncTableMetadata();
+    
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+        .lastInstant());
+    LOG.info("Last Instant Cached by writer with instant " + instantTime + " 
is " + this.txnManager.getLastCompletedTransactionOwner());
+    this.txnManager.setTransactionOwner(Option.of(new 
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+    this.txnManager.beginTransaction();

Review comment:
       can the above setters be passed to an overloaded `beginTransaction(..)` 
call? Whenever we have these contracts that some setters must be called ahead 
of a beginTransaction, makes for a harder maintenance/read

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -114,6 +115,7 @@
       HoodieFailedWritesCleaningPolicy.EAGER.name();
   private static final String DEFAULT_AUTO_CLEAN = "true";
   private static final String DEFAULT_ASYNC_CLEAN = "false";
+  private static final String DEFAULT_INLINE_CLEAN = DEFAULT_AUTO_CLEAN;

Review comment:
       auto clean is different from the cleaning mode itself. lets just have an 
assignment to the hardcoded string `"true"`?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import 
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+
+/**
+ * Hoodie Configs for Locks.
+ */
+public class HoodieLockConfig extends DefaultHoodieConfig {
+
+  // Pluggable type of lock provider
+  public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PREFIX + 
"provider";
+  public static final String DEFAULT_LOCK_PROVIDER_CLASS = 
ZookeeperBasedLockProvider.class.getName();
+  // Pluggable strategies to use when resolving conflicts
+  public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP =
+      LOCK_PREFIX + "conflict.resolution.strategy";
+  public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS =
+      SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName();
+
+  private HoodieLockConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieLockConfig.Builder newBuilder() {
+    return new HoodieLockConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public HoodieLockConfig.Builder fromFile(File propertiesFile) throws 
IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public HoodieLockConfig.Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withLockProvider(Class<? extends 
LockProvider> lockProvider) {
+      props.setProperty(LOCK_PROVIDER_CLASS_PROP, lockProvider.getName());
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withHiveDatabaseName(String databaseName) {
+      props.setProperty(HIVE_DATABASE_NAME_PROP, databaseName);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withHiveTableName(String tableName) {
+      props.setProperty(HIVE_TABLE_NAME_PROP, tableName);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
+      props.setProperty(ZK_CONNECT_URL_PROP, zkQuorum);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkBasePath(String zkBasePath) {
+      props.setProperty(ZK_BASE_PATH_PROP, zkBasePath);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkPort(String zkPort) {
+      props.setProperty(ZK_PORT_PROP, zkPort);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkLockKey(String zkLockKey) {
+      props.setProperty(ZK_LOCK_KEY_PROP, zkLockKey);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkConnectionTimeoutInMs(Long 
connectionTimeoutInMs) {
+      props.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP, 
String.valueOf(connectionTimeoutInMs));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkSessionTimeoutInMs(Long 
sessionTimeoutInMs) {
+      props.setProperty(ZK_SESSION_TIMEOUT_MS_PROP, 
String.valueOf(sessionTimeoutInMs));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withNumRetries(int numRetries) {
+      props.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP, 
String.valueOf(numRetries));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withRetryWaitTimeInMillis(Long 
retryWaitTimeInMillis) {
+      props.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP, 
String.valueOf(retryWaitTimeInMillis));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries) 
{
+      props.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, 
String.valueOf(clientNumRetries));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withClientRetryWaitTimeInMillis(Long 
clientRetryWaitTimeInMillis) {
+      props.setProperty(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP, 
String.valueOf(clientRetryWaitTimeInMillis));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withLockWaitTimeInMillis(Long 
waitTimeInMillis) {
+      props.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP, 
String.valueOf(waitTimeInMillis));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder 
withConflictResolutionStrategy(ConflictResolutionStrategy 
conflictResolutionStrategy) {
+      props.setProperty(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP, 
conflictResolutionStrategy.getClass().getName());
+      return this;
+    }
+
+    public HoodieLockConfig build() {

Review comment:
       given we have had some typo related issues recently, please check each 
line once for correctness

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -66,6 +70,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig c
     this.operationType = operationType;
     this.extraMetadata = extraMetadata;
     this.taskContextSupplier = context.getTaskContextSupplier();
+    this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
+    // TODO : Remove this once we refactor and move out autoCommit method from 
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.

Review comment:
       lets create a code cleanup JIRA for this. else we may not get to this.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -193,8 +200,22 @@ public boolean commitStats(String instantTime, 
List<HoodieWriteStat> stats, Opti
     return true;
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                      List<HoodieWriteStat> stats) throws IOException {
+    preCommit(instantTime, metadata);

Review comment:
       lets move the `preCommit()` call out of here? `commit()` calling 
`preCommit()` is bit confusing to read

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -389,29 +429,33 @@ protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata me
       // Delete the marker directory for the instant.
       new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
 
-      // Do an inline compaction if enabled
-      if (config.isInlineCompaction()) {
-        runAnyPendingCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
-        inlineCompact(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
-      }
+      if (config.inlineTableServices()) {
+        // Do an inline compaction if enabled
+        if (config.inlineCompactionEnabled()) {
+          runAnyPendingCompactions(table);
+          metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
+          inlineCompact(extraMetadata);
+        } else {
+          metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
+        }
 
-      // Do an inline clustering if enabled
-      if (config.isInlineClustering()) {
-        runAnyPendingClustering(table);
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, 
"true");
-        inlineCluster(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, 
"false");
+        // Do an inline clustering if enabled
+        if (config.inlineClusteringEnabled()) {
+          runAnyPendingClustering(table);
+          metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, 
"true");
+          inlineCluster(extraMetadata);
+        } else {
+          metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, 
"false");

Review comment:
       duplicate line?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of 
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+  /**
+   * Stream of instants to check conflicts against.
+   * @return
+   */
+  Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);

Review comment:
       rename: getCandidateInstants() to clarify intent. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I 
preppedRecords, final String instan
    * Common method containing steps to be performed before write 
(upsert/insert/...
    * @param instantTime
    * @param writeOperationType
+   * @param metaClient
    */
-  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
+  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+      HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
-    syncTableMetadata();
+    
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+        .lastInstant());
+    LOG.info("Last Instant Cached by writer with instant " + instantTime + " 
is " + this.txnManager.getLastCompletedTransactionOwner());
+    this.txnManager.setTransactionOwner(Option.of(new 
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+    this.txnManager.beginTransaction();
+    try {
+      syncTableMetadata();
+    } finally {
+      this.txnManager.endTransaction();

Review comment:
       do we need to distinguish between `endTransaction()` and an abort ? i.e 
any cleanups in the transaction manager to be done here upon exception?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of 
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+  /**
+   * Stream of instants to check conflicts against.
+   * @return
+   */
+  Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+
+  /**
+   * Implementations of this method will determine whether a conflict exists 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation);
+
+  /**
+   * Implementations of this method will determine how to resolve a conflict 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<HoodieCommitMetadata> 
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter, 
HoodieTable table,

Review comment:
       having the metadata writer passed in, feels off. any way to avoid this?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an 
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(LockManager.class);
+  private final HoodieWriteConfig writeConfig;
+  private final LockConfiguration lockConfiguration;
+  private final SerializableConfiguration hadoopConf;
+  private volatile LockProvider lockProvider;
+  // Holds the latest completed write instant to know which ones to check 
conflict against
+  private final AtomicReference<Option<HoodieInstant>> 
latestCompletedWriteInstant;
+
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+    this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+  }
+
+  public void lock() {
+    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      LockProvider lockProvider = getLockProvider();
+      boolean acquired = false;
+      try {
+        int retries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+        long waitTimeInMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+        int retryCount = 0;
+        while (retryCount <= retries) {
+          acquired = 
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), 
TimeUnit.MILLISECONDS);
+          if (acquired) {
+            break;
+          }
+          LOG.info("Retrying...");
+          Thread.sleep(waitTimeInMs);
+          retryCount++;
+        }
+      } catch (Exception e) {

Review comment:
       any special handling for InterruptedException? this is a common cause of 
bugs in such locking code paths

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/TransactionManager.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+
+/**
+ * This class allows clients to start and end transactions. Anything done 
between a start and end transaction is
+ * guaranteed to be atomic.
+ */
+public class TransactionManager implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionManager.class);
+
+  private final LockManager lockManager;
+  private Option<HoodieInstant> currentTxnOwnerInstant;
+  private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
+
+  public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
+    this.lockManager = new LockManager(config, fs);
+  }
+
+  public synchronized void setLastCompletedTransaction(Option<HoodieInstant> 
instant) {
+    this.lastCompletedTxnOwnerInstant = instant;
+    
lockManager.compareAndSetLatestCompletedWriteInstant(lockManager.getLatestCompletedWriteInstant().get(),
 instant);
+    LOG.info("Latest completed transaction instant " + instant);
+  }
+
+  public synchronized void setTransactionOwner(Option<HoodieInstant> instant) {
+    this.currentTxnOwnerInstant = instant;
+    LOG.info("Current transaction instant " + instant);
+  }
+
+  public synchronized void beginTransaction() {
+    LOG.info("Transaction starting");

Review comment:
       combine into a single log line? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/HoodieCommitOperation.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is used to hold all information used to identify how to resolve 
conflicts between instants.
+ * Since we interchange payload types between AVRO specific records and 
POJO's, this object serves as
+ * a common payload to manage these conversions.
+ */
+public class HoodieCommitOperation {

Review comment:
       rename: ConflictingOperation

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+    implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+                                                 Option<HoodieInstant> 
lastSuccessfulInstant) {
+
+    // To find which instants are conflicting, we apply the following logic
+    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+    // that are being newly created by the current write.
+    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+        .getCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .filterCompletedInstants()
+        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")

Review comment:
       can we avoid passing the `"0"` or use an existing constant for init 
instant time etc

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an 
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(LockManager.class);
+  private final HoodieWriteConfig writeConfig;
+  private final LockConfiguration lockConfiguration;
+  private final SerializableConfiguration hadoopConf;
+  private volatile LockProvider lockProvider;
+  // Holds the latest completed write instant to know which ones to check 
conflict against
+  private final AtomicReference<Option<HoodieInstant>> 
latestCompletedWriteInstant;
+
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+    this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+  }
+
+  public void lock() {
+    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      LockProvider lockProvider = getLockProvider();
+      boolean acquired = false;
+      try {
+        int retries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+        long waitTimeInMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+        int retryCount = 0;
+        while (retryCount <= retries) {
+          acquired = 
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), 
TimeUnit.MILLISECONDS);
+          if (acquired) {
+            break;
+          }
+          LOG.info("Retrying...");
+          Thread.sleep(waitTimeInMs);
+          retryCount++;
+        }
+      } catch (Exception e) {
+        throw new HoodieLockException("Unable to acquire lock ", e);
+      }
+      if (!acquired) {
+        throw new HoodieLockException("Unable to acquire lock, lock object " + 
lockProvider.getLock());
+      }
+    }
+  }
+
+  public void unlock() {
+    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      getLockProvider().unlock();

Review comment:
       I assume the providers unlock() will throw more exceptions. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/TransactionManager.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+
+/**
+ * This class allows clients to start and end transactions. Anything done 
between a start and end transaction is
+ * guaranteed to be atomic.
+ */
+public class TransactionManager implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionManager.class);
+
+  private final LockManager lockManager;
+  private Option<HoodieInstant> currentTxnOwnerInstant;
+  private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
+
+  public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
+    this.lockManager = new LockManager(config, fs);
+  }
+
+  public synchronized void setLastCompletedTransaction(Option<HoodieInstant> 
instant) {
+    this.lastCompletedTxnOwnerInstant = instant;
+    
lockManager.compareAndSetLatestCompletedWriteInstant(lockManager.getLatestCompletedWriteInstant().get(),
 instant);
+    LOG.info("Latest completed transaction instant " + instant);
+  }
+
+  public synchronized void setTransactionOwner(Option<HoodieInstant> instant) {
+    this.currentTxnOwnerInstant = instant;
+    LOG.info("Current transaction instant " + instant);
+  }
+
+  public synchronized void beginTransaction() {
+    LOG.info("Transaction starting");
+    LOG.info("Transaction Owner " + currentTxnOwnerInstant);
+    lockManager.lock();
+    LOG.info("Transaction started");
+  }
+
+  public synchronized void endTransaction() {
+    LOG.info("Transaction ending");

Review comment:
       same . combine?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+    implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+                                                 Option<HoodieInstant> 
lastSuccessfulInstant) {
+
+    // To find which instants are conflicting, we apply the following logic
+    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+    // that are being newly created by the current write.
+    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+        .getCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .filterCompletedInstants()
+        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")
+        .getInstants();
+
+    Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+        .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+        .findInstantsAfter(currentInstant.getTimestamp())
+        .getInstants();
+    return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+    Set<String> fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+    Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+    intersection.retainAll(fileIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.error("Found conflicting writes between first operation = " + 
thisOperation

Review comment:
       is this an error though. can we move to INFO

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+  private static final Logger LOG = 
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+  private CuratorFramework curatorFrameworkClient;

Review comment:
       make it `final`. In general, can you make a pass to ensure what can be 
final is made final. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  // Only a single writer can perform write ops
+  SINGLE_WRITER("single_writer"),
+  // Multiple writer can perform write ops with lazy conflict resolution using 
locks
+  OPTIMISTIC_CONCURRENCY_CONTROL("optimistic_concurrency_control");
+
+  private final String value;
+
+  WriteConcurrencyMode(String value) {
+    this.value = value;
+  }
+
+  /**
+   * Getter for write concurrency mode.
+   * @return
+   */
+  public String value() {
+    return value;
+  }
+
+  /**
+   * Convert string value to WriteConcurrencyMode.
+   */
+  public static WriteConcurrencyMode fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "single_writer":
+        return SINGLE_WRITER;
+      case "optimistic_concurrency_control":
+        return OPTIMISTIC_CONCURRENCY_CONTROL;
+      default:
+        throw new HoodieException("Invalid value of Type.");
+    }
+  }
+
+  public boolean supportsOptimisticConcurrencyControl() {

Review comment:
       rename: isOptimistic... ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and 
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {

Review comment:
       does this have an unit test on its own?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, 
String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+    if (HoodieTableVersion.current() != 
metaClient.getTableConfig().getTableVersion()) {
+      // TODO : Force clean up of all inflights, do this once pending rollback 
removal PR is landed
+      // this.rollbackFailedWrites();
+      this.txnManager.beginTransaction();

Review comment:
       should we check concurrency mode before taking these locks?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -82,6 +84,10 @@ public static WriteOperationType fromValue(String value) {
         return INSERT_OVERWRITE_TABLE;
       case "cluster":
         return CLUSTER;
+      case "compact":
+        return COMPACT;
+      case "unknown":

Review comment:
       what is this really?  how can there be an write that is unknown? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe

Review comment:
       but we seem to synchronize down below?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, 
String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+    if (HoodieTableVersion.current() != 
metaClient.getTableConfig().getTableVersion()) {

Review comment:
       lets add a method `needsUpgradeOrDowngrade() ` to the upgradedowngrade 
class?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+  private static final Logger LOG = 
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+  private CuratorFramework curatorFrameworkClient;
+  private volatile InterProcessMutex lock = null;
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final Configuration conf) {
+    this(lockConfiguration);
+    this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+        
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+        .retryPolicy(new 
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+            5000, 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+        
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_SESSION_TIMEOUT_MS))
+        
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+        .build();
+    this.curatorFrameworkClient.start();
+  }
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final CuratorFramework curatorFrameworkClient) {
+    this(lockConfiguration);
+    this.curatorFrameworkClient = curatorFrameworkClient;
+    synchronized (this.curatorFrameworkClient) {
+      if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
+        this.curatorFrameworkClient.start();
+      }
+    }
+  }
+
+  ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+    checkRequiredProps(lockConfiguration);
+    this.lockConfiguration = lockConfiguration;
+  }
+  
+  public void acquireLock(long time, TimeUnit unit) throws Exception {
+    ValidationUtils.checkArgument(this.lock == null, 
LockState.ALREADY_ACQUIRED.name());
+    InterProcessMutex newLock = new InterProcessMutex(
+        this.curatorFrameworkClient, 
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP) + "/"
+        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+    newLock.acquire(time, unit);
+    if (newLock.isAcquiredInThisProcess()) {
+      lock = newLock;
+    }
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+    LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
+    try {
+      acquireLock(time, unit);
+      LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
+    } catch (Exception e) {
+      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);

Review comment:
       so, the lock can be acquired by another process and we don't raise this 
exception?  is that ok? I see lines 94-96 above, where we simply do the 
assignment and not raise any exceptions from an `else` block 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and 
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
+
+  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
+    HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
+    archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+    archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+    switch (hoodieInstant.getAction()) {
+      case HoodieTimeline.CLEAN_ACTION: {
+        if (hoodieInstant.isCompleted()) {
+          
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 hoodieInstant));
+        } else {
+          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 hoodieInstant));
+        }
+        archivedMetaWrapper.setActionType(ActionType.clean.name());
+        break;
+      }
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.DELTA_COMMIT_ACTION: {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+        archivedMetaWrapper.setActionType(ActionType.commit.name());
+        break;
+      }
+      case HoodieTimeline.REPLACE_COMMIT_ACTION: {
+        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
+                
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieReplaceCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
+        archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+        break;
+      }
+      case HoodieTimeline.ROLLBACK_ACTION: {
+        
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+                
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(), 
HoodieRollbackMetadata.class));
+        archivedMetaWrapper.setActionType(ActionType.rollback.name());
+        break;
+      }
+      case HoodieTimeline.SAVEPOINT_ACTION: {
+        
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+                
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(), 
HoodieSavepointMetadata.class));
+        archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+        break;
+      }
+      case HoodieTimeline.COMPACTION_ACTION: {
+        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
+        archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        archivedMetaWrapper.setActionType(ActionType.compaction.name());
+        break;
+      }
+      default: {
+        throw new UnsupportedOperationException("Action not fully supported 
yet");
+      }
+    }
+    return archivedMetaWrapper;
+  }
+
+  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant,
+                                                          HoodieCommitMetadata 
hoodieCommitMetadata) {
+    HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
+    archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+    archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+    
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(hoodieCommitMetadata));
+    archivedMetaWrapper.setActionType(ActionType.commit.name());
+    return archivedMetaWrapper;
+  }
+
+  public static org.apache.hudi.avro.model.HoodieCommitMetadata 
convertCommitMetadata(
+          HoodieCommitMetadata hoodieCommitMetadata) {
+    ObjectMapper mapper = new ObjectMapper();
+    // Need this to ignore other public get() methods
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData =
+            mapper.convertValue(hoodieCommitMetadata, 
org.apache.hudi.avro.model.HoodieCommitMetadata.class);
+    // Do not archive Rolling Stats, cannot set to null since AVRO will throw 
null pointer
+    
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
 "");
+    return avroMetaData;
+  }
+
+  // TODO : Fix converting from SpecificRecord to POJO

Review comment:
       would this come back to haunt us ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   public void syncTableMetadata() {
     // Open up the metadata table again, for syncing
-    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+    try {
+      HoodieTableMetadataWriter writer =
+          SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, 
context);
       LOG.info("Successfully synced to metadata table");
     } catch (Exception e) {
       throw new HoodieMetadataException("Error syncing to metadata table.", e);
     }
   }
 
+  @Override
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);

Review comment:
       should we recreate? again, we need to revisit this whole passing of 
metadatawriter to resolve conflict. Don't understand this part

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+    implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+                                                 Option<HoodieInstant> 
lastSuccessfulInstant) {
+
+    // To find which instants are conflicting, we apply the following logic
+    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+    // that are being newly created by the current write.
+    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+        .getCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .filterCompletedInstants()
+        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")
+        .getInstants();
+
+    Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+        .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+        .findInstantsAfter(currentInstant.getTimestamp())
+        .getInstants();
+    return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+    Set<String> fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+    Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+    intersection.retainAll(fileIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.error("Found conflicting writes between first operation = " + 
thisOperation
+          + ", second operation = " + otherOperation + " , intersecting file 
ids " + intersection);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> 
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter, 
HoodieTable table,
+                                              HoodieCommitOperation 
thisOperation, HoodieCommitOperation otherOperation) {
+    // NOTE that any commits from table services such as compaction, 
clustering or cleaning since the
+    // overlapping of files is handled using MVCC. Since compaction is 
eventually written as commit, we need to ensure
+    // we handle this during conflict resolution and not treat the commit from 
compaction operation as a regular commit.
+    if (otherOperation.getOperationType() == WriteOperationType.UNKNOWN

Review comment:
       but replace commit can also result from INSERT_OVERWRITE correct? how do 
we distinguish this? I feel we need a more nuanced check here. So ensure only 
writes fail each other.  

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionUtils.class);
+
+  /**
+   * Resolve any write conflicts when committing data.
+   * @param table
+   * @param metadataWriter
+   * @param currentTxnOwnerInstant
+   * @param thisCommitMetadata
+   * @param config
+   * @param lastCompletedTxnOwnerInstant
+   * @return
+   * @throws HoodieWriteConflictException
+   */
+  public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final 
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
+                                                                       final 
Option<HoodieInstant> currentTxnOwnerInstant, final 
Option<HoodieCommitMetadata> thisCommitMetadata,
+                                                                       final 
HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant)
+      throws HoodieWriteConflictException {
+    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      ConflictResolutionStrategy resolutionStrategy = 
config.getWriteConflictResolutionStrategy();
+      Stream<HoodieInstant> instantStream = 
resolutionStrategy.getInstantsStream(table.getActiveTimeline(), 
currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
+      // TODO : metadataWriter.reload() inside resolve write conflict ??

Review comment:
       Do we need this? is this TODO still relevant

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import 
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;

Review comment:
       are these ever used in `hudi-common`? if we don't anticipate readers 
using this. we should just keep all this in `hudi-client-common` under a 
transaction package

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, 
String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+    if (HoodieTableVersion.current() != 
metaClient.getTableConfig().getTableVersion()) {
+      // TODO : Force clean up of all inflights, do this once pending rollback 
removal PR is landed
+      // this.rollbackFailedWrites();
+      this.txnManager.beginTransaction();
+      try {
+        // Ensure no inflight commits
+        TransactionUtils.resolveConflictIfAnyForUpgradeDowngrade(metaClient);
+        new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
+      } finally {
+        this.txnManager.endTransaction();
+      }
+    }
+    return getTableAndInitCtx(metaClient, operationType, instantTime);
   }
 
-  private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, 
WriteOperationType operationType) {
+  // TODO : To enforce priority between table service and ingestion writer, 
use transactions here and invoke strategy

Review comment:
       comment valid? What should we do about this issue? Can you elabortate?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionUtils.class);
+
+  /**
+   * Resolve any write conflicts when committing data.
+   * @param table
+   * @param metadataWriter
+   * @param currentTxnOwnerInstant
+   * @param thisCommitMetadata
+   * @param config
+   * @param lastCompletedTxnOwnerInstant
+   * @return
+   * @throws HoodieWriteConflictException
+   */
+  public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final 
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,

Review comment:
       unit test this method?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   public void syncTableMetadata() {
     // Open up the metadata table again, for syncing
-    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+    try {
+      HoodieTableMetadataWriter writer =

Review comment:
       we lose auto closing by moving to  a regular try-catch. Why is this 
change needed?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider<T> implements Lock, AutoCloseable {

Review comment:
       can this be an interface?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommonMetadata.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+
+public class HoodieCommonMetadata {

Review comment:
       rename: HoodieMetadataWrapper

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
##########
@@ -222,6 +225,17 @@ protected void commit(Option<Map<String, String>> 
extraMetadata, HoodieWriteMeta
     LOG.info("Committing metadata bootstrap !!");
   }
 
+  @Override
+  protected void syncTableMetadata() {

Review comment:
       I think all this can be removed. from all action executors? can't we 
take the lock in post commit/write from the write client level?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Configuration for managing locks. Since this configuration needs to be 
shared with HiveMetaStore based lock,
+ * which is in a different package than other lock providers, we use this as a 
data transfer object in hoodie-common
+ */
+public class LockConfiguration implements Serializable {
+
+  public static final String LOCK_PREFIX = "hoodie.writer.lock.";
+  public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = 
LOCK_PREFIX + "wait_time_ms_between_retry";
+  public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = 
String.valueOf(5000L);
+  public static final String 
LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + 
"client.wait_time_ms_between_retry";
+  public static final String 
DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(10000L);
+  public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_PREFIX + 
"num_retries";
+  public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = 
String.valueOf(3);
+  public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP = 
LOCK_PREFIX + "client.num_retries";
+  public static final String DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES = 
String.valueOf(0);
+  public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = LOCK_PREFIX + 
"wait_time_ms";
+  public static final int DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS = 60 * 1000;
+  // configs for file system based locks. NOTE: This only works for DFS with 
atomic create/delete operation
+  public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX = 
LOCK_PREFIX + "filesystem.";
+  public static final String FILESYSTEM_LOCK_PATH_PROP = 
FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
+  // configs for metastore based locks
+  public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX 
+ "hivemetastore.";
+  public static final String HIVE_DATABASE_NAME_PROP = 
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "database";
+  public static final String HIVE_TABLE_NAME_PROP = 
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "table";
+  // Zookeeper configs for zk based locks
+  public static final String ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX = 
LOCK_PREFIX + "zookeeper.";
+  public static final String ZK_BASE_PATH_PROP = 
ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_base_path";

Review comment:
       I think the common term is . zookeeper chroot? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -195,30 +126,24 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
 
   @Override
   public HoodieCleanMetadata execute() {
+    List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
     // If there are inflight(failed) or previously requested clean operation, 
first perform them
     List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
         
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
     if (pendingCleanInstants.size() > 0) {
       pendingCleanInstants.forEach(hoodieInstant -> {
         LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
         try {
-          runPendingClean(table, hoodieInstant);
+          cleanMetadataList.add(runPendingClean(table, hoodieInstant));
         } catch (Exception e) {
           LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
         }
       });
       table.getMetaClient().reloadActiveTimeline();
     }
-
-    // Plan and execute a new clean action
-    Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
-    if (cleanerPlanOpt.isPresent()) {
-      table.getMetaClient().reloadActiveTimeline();
-      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
-      if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && 
!cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
-        return runClean(table, 
HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
-      }
-    }
-    return null;
+    // return the last clean metadata for now
+    // TODO (NA) : Clean only the earliest pending clean just like how we do 
for other table services

Review comment:
       I don't understand why we just pick the last cleaned metadata. Lets do 
the generically right thing. If you want to handle more than more cleaning 
operation, lets return a list? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class BaseCleanPlanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, 
Option<HoodieCleanerPlan>> {
+
+  private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
+
+  private final Option<Map<String, String>> extraMetadata;
+
+  public BaseCleanPlanActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable<T, I, K, O> table,
+                                     String instantTime,
+                                     Option<Map<String, String>> 
extraMetadata) {
+    super(context, config, table, instantTime);
+    this.extraMetadata = extraMetadata;
+  }
+
+  protected abstract Option<HoodieCleanerPlan> createCleanerPlan();
+
+  /**
+   * Generates List of files to be cleaned.
+   *
+   * @param context HoodieEngineContext
+   * @return Cleaner Plan
+   */
+  HoodieCleanerPlan requestClean(HoodieEngineContext context) {

Review comment:
       I assume, this is all just code moved from the other class. if not , 
please point out what has changed

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -43,7 +43,8 @@
   public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
   public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
   public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
-
+  // Turn on inline cleaning
+  public static final String INLINE_CLEAN_PROP = "hoodie.clean.inline";

Review comment:
       should there be a single property? i.e hoodie.clean.async= false does 
imply `hoodie.clean.inline=true` right?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -195,30 +126,24 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
 
   @Override
   public HoodieCleanMetadata execute() {
+    List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
     // If there are inflight(failed) or previously requested clean operation, 
first perform them
     List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
         
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
     if (pendingCleanInstants.size() > 0) {
       pendingCleanInstants.forEach(hoodieInstant -> {
         LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
         try {
-          runPendingClean(table, hoodieInstant);
+          cleanMetadataList.add(runPendingClean(table, hoodieInstant));
         } catch (Exception e) {
           LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
         }
       });
       table.getMetaClient().reloadActiveTimeline();
     }
-
-    // Plan and execute a new clean action
-    Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
-    if (cleanerPlanOpt.isPresent()) {
-      table.getMetaClient().reloadActiveTimeline();
-      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
-      if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && 
!cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
-        return runClean(table, 
HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
-      }
-    }
-    return null;
+    // return the last clean metadata for now
+    // TODO (NA) : Clean only the earliest pending clean just like how we do 
for other table services

Review comment:
       Won't this for e.g mess up the metadata table? by missing some deletes?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider<T> implements Lock, AutoCloseable {

Review comment:
       if there is no shared code here, we should go for an interface vs an 
abstract class

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and 
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {

Review comment:
       rename to `MetadataConversionUtils` ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -117,12 +126,24 @@ protected String getCommitActionType() {
   protected void commitOnAutoCommit(HoodieWriteMetadata result) {
     if (config.shouldAutoCommit()) {
       LOG.info("Auto commit enabled: Committing " + instantTime);
-      commit(extraMetadata, result);
+      autoCommit(extraMetadata, result);
     } else {
       LOG.info("Auto commit disabled for " + instantTime);
     }
   }
 
+  protected void autoCommit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<O> result) {
+    this.txnManager.beginTransaction();
+    try {
+      // TODO : Refactor this method so we can pass a valid metadata table 
writer

Review comment:
       revisit the todo?

##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
##########
@@ -224,6 +231,16 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata,
     return getTableAndInitCtx(metaClient, operationType);
   }
 
+  @Override
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);
+    // TODO : Metadata Writer is not supported for Java

Review comment:
       revisit comment.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionUtils.class);
+
+  /**
+   * Resolve any write conflicts when committing data.
+   * @param table
+   * @param metadataWriter
+   * @param currentTxnOwnerInstant
+   * @param thisCommitMetadata
+   * @param config
+   * @param lastCompletedTxnOwnerInstant
+   * @return
+   * @throws HoodieWriteConflictException
+   */
+  public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final 
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,

Review comment:
       is the `metadataWriter` really used inside this method?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -93,4 +95,28 @@ private static HoodieCommitMetadata 
buildMetadataFromStats(List<HoodieWriteStat>
         + "numReplaceFileIds:" + 
partitionToReplaceFileIds.values().stream().mapToInt(e -> e.size()).sum());
     return commitMetadata;
   }
+
+  public static HashMap<String, String> 
getFileIdWithoutSuffixAndRelativePaths(Map<String, 
List<org.apache.hudi.avro.model.HoodieWriteStat>>

Review comment:
       lets unit tests these?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, 
String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+    if (HoodieTableVersion.current() != 
metaClient.getTableConfig().getTableVersion()) {
+      // TODO : Force clean up of all inflights, do this once pending rollback 
removal PR is landed

Review comment:
       lets clean this up?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -265,6 +266,20 @@ public void rollbackBootstrap(HoodieEngineContext context, 
String instantTime) {
     throw new HoodieNotSupportedException("Bootstrap is not supported yet");
   }
 
+  /**
+   * TODO :
+   * Refactor {@link FlinkCleanActionExecutor} to support scheduling of 
cleaning.
+   * @param context HoodieEngineContext
+   * @param instantTime Instant Time for scheduling cleaning
+   * @param extraMetadata additional metadata to write into plan
+   * @return
+   */
+  @Override
+  public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {

Review comment:
       this does not seem ok to do? 

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -313,6 +320,16 @@ public void cleanHandles() {
     return writeHandle;
   }
 
+  @Override
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);
+    // TODO : Metadata Writer is not supported for Flink

Review comment:
       revisit todo?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/TableService.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+/**
+ * Supported runtime table services.
+ */
+public enum TableService {

Review comment:
       rename: TableServiceType




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to