n3nash commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r587777089



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -30,16 +31,19 @@
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableService;

Review comment:
       I kept it here because WriteOperationType is also here.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I 
preppedRecords, final String instan
    * Common method containing steps to be performed before write 
(upsert/insert/...
    * @param instantTime
    * @param writeOperationType
+   * @param metaClient
    */
-  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
+  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+      HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
-    syncTableMetadata();
+    
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+        .lastInstant());
+    LOG.info("Last Instant Cached by writer with instant " + instantTime + " 
is " + this.txnManager.getLastCompletedTransactionOwner());
+    this.txnManager.setTransactionOwner(Option.of(new 
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+    this.txnManager.beginTransaction();

Review comment:
       I also realized this during implementation but wanted to keep 
`beginTransaction(..)` API simple. I've added a overridden method now

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of 
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+  /**
+   * Stream of instants to check conflicts against.
+   * @return
+   */
+  Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+
+  /**
+   * Implementations of this method will determine whether a conflict exists 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation);
+
+  /**
+   * Implementations of this method will determine how to resolve a conflict 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<HoodieCommitMetadata> 
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter, 
HoodieTable table,

Review comment:
       So this is being passed to allow for the metadata to be manipulated to 
do some kind of conflict resolution. Right now it's not being used anywhere, I 
can remove it but will need to be added sometime soon when we need to do 
conflict resolution more than just throwing exception.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/HoodieCommitOperation.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is used to hold all information used to identify how to resolve 
conflicts between instants.
+ * Since we interchange payload types between AVRO specific records and 
POJO's, this object serves as
+ * a common payload to manage these conversions.
+ */
+public class HoodieCommitOperation {

Review comment:
       So this is just wrap the `CommitMetadata` to a common payload. 
`ConflictingOperation` suggests this is already a conflicting operation which 
it is not yet. Open to other suggestions if you have 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe

Review comment:
       The synchronized method is just for testing to start the ZK else 
parallel tests end up triggering start multiple times and that causes issues.. 
Since we don't use @VisibleTesting, I've put a comment

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and 
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {

Review comment:
       Added one now

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import 
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+
+/**
+ * Hoodie Configs for Locks.
+ */
+public class HoodieLockConfig extends DefaultHoodieConfig {
+
+  // Pluggable type of lock provider
+  public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PREFIX + 
"provider";
+  public static final String DEFAULT_LOCK_PROVIDER_CLASS = 
ZookeeperBasedLockProvider.class.getName();
+  // Pluggable strategies to use when resolving conflicts
+  public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP =
+      LOCK_PREFIX + "conflict.resolution.strategy";
+  public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS =
+      SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName();
+
+  private HoodieLockConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieLockConfig.Builder newBuilder() {
+    return new HoodieLockConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public HoodieLockConfig.Builder fromFile(File propertiesFile) throws 
IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public HoodieLockConfig.Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withLockProvider(Class<? extends 
LockProvider> lockProvider) {
+      props.setProperty(LOCK_PROVIDER_CLASS_PROP, lockProvider.getName());
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withHiveDatabaseName(String databaseName) {
+      props.setProperty(HIVE_DATABASE_NAME_PROP, databaseName);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withHiveTableName(String tableName) {
+      props.setProperty(HIVE_TABLE_NAME_PROP, tableName);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
+      props.setProperty(ZK_CONNECT_URL_PROP, zkQuorum);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkBasePath(String zkBasePath) {
+      props.setProperty(ZK_BASE_PATH_PROP, zkBasePath);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkPort(String zkPort) {
+      props.setProperty(ZK_PORT_PROP, zkPort);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkLockKey(String zkLockKey) {
+      props.setProperty(ZK_LOCK_KEY_PROP, zkLockKey);
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkConnectionTimeoutInMs(Long 
connectionTimeoutInMs) {
+      props.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP, 
String.valueOf(connectionTimeoutInMs));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withZkSessionTimeoutInMs(Long 
sessionTimeoutInMs) {
+      props.setProperty(ZK_SESSION_TIMEOUT_MS_PROP, 
String.valueOf(sessionTimeoutInMs));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withNumRetries(int numRetries) {
+      props.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP, 
String.valueOf(numRetries));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withRetryWaitTimeInMillis(Long 
retryWaitTimeInMillis) {
+      props.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP, 
String.valueOf(retryWaitTimeInMillis));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries) 
{
+      props.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, 
String.valueOf(clientNumRetries));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withClientRetryWaitTimeInMillis(Long 
clientRetryWaitTimeInMillis) {
+      props.setProperty(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP, 
String.valueOf(clientRetryWaitTimeInMillis));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder withLockWaitTimeInMillis(Long 
waitTimeInMillis) {
+      props.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP, 
String.valueOf(waitTimeInMillis));
+      return this;
+    }
+
+    public HoodieLockConfig.Builder 
withConflictResolutionStrategy(ConflictResolutionStrategy 
conflictResolutionStrategy) {
+      props.setProperty(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP, 
conflictResolutionStrategy.getClass().getName());
+      return this;
+    }
+
+    public HoodieLockConfig build() {

Review comment:
       checked

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class BaseCleanPlanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, 
Option<HoodieCleanerPlan>> {
+
+  private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
+
+  private final Option<Map<String, String>> extraMetadata;
+
+  public BaseCleanPlanActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable<T, I, K, O> table,
+                                     String instantTime,
+                                     Option<Map<String, String>> 
extraMetadata) {
+    super(context, config, table, instantTime);
+    this.extraMetadata = extraMetadata;
+  }
+
+  protected abstract Option<HoodieCleanerPlan> createCleanerPlan();
+
+  /**
+   * Generates List of files to be cleaned.
+   *
+   * @param context HoodieEngineContext
+   * @return Cleaner Plan
+   */
+  HoodieCleanerPlan requestClean(HoodieEngineContext context) {

Review comment:
       Yes, no change.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -195,30 +126,24 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
 
   @Override
   public HoodieCleanMetadata execute() {
+    List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
     // If there are inflight(failed) or previously requested clean operation, 
first perform them
     List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
         
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
     if (pendingCleanInstants.size() > 0) {
       pendingCleanInstants.forEach(hoodieInstant -> {
         LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
         try {
-          runPendingClean(table, hoodieInstant);
+          cleanMetadataList.add(runPendingClean(table, hoodieInstant));
         } catch (Exception e) {
           LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
         }
       });
       table.getMetaClient().reloadActiveTimeline();
     }
-
-    // Plan and execute a new clean action
-    Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
-    if (cleanerPlanOpt.isPresent()) {
-      table.getMetaClient().reloadActiveTimeline();
-      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
-      if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && 
!cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
-        return runClean(table, 
HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
-      }
-    }
-    return null;
+    // return the last clean metadata for now
+    // TODO (NA) : Clean only the earliest pending clean just like how we do 
for other table services

Review comment:
       Currently, the clean metadata from `runPendingClean` is never returned 
if you see above in line 205. The current logic is as follows 
   1) For all pending clean operations, we just return `null`. 
   2) If there is new clean to be done, we do the new clean and return the 
metadata.
   
   Clean metadata is always persisted before this method inside the `runClean` 
method and the return value of this method is NOT used by the client. 
   
   I made above changes to reuse the same methods and  to keep the same 
behavior, except with one change :
   
   1) For all pending clean operations, we return the latest pending clean from 
previous runs. 
   2) If there is new clean to be done, we do the new clean and return the 
metadata.
   
   Other logic remains the same. The returned metadata is ONLY used for a) 
Logging b) Metrics. 
   
   Let's refactor all these issues in the ActionExecutor in follow up PR. Filed 
issue here -> https://issues.apache.org/jira/browse/HUDI-1666 
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I 
preppedRecords, final String instan
    * Common method containing steps to be performed before write 
(upsert/insert/...
    * @param instantTime
    * @param writeOperationType
+   * @param metaClient
    */
-  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
+  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+      HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
-    syncTableMetadata();
+    
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+        .lastInstant());
+    LOG.info("Last Instant Cached by writer with instant " + instantTime + " 
is " + this.txnManager.getLastCompletedTransactionOwner());
+    this.txnManager.setTransactionOwner(Option.of(new 
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+    this.txnManager.beginTransaction();
+    try {
+      syncTableMetadata();
+    } finally {
+      this.txnManager.endTransaction();

Review comment:
       Right now, `endTransaction()` is doing the job or end & abort - no 
difference in behavior. Both ensure that if lock was acquired release and 
cleanup other state.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -117,12 +126,24 @@ protected String getCommitActionType() {
   protected void commitOnAutoCommit(HoodieWriteMetadata result) {
     if (config.shouldAutoCommit()) {
       LOG.info("Auto commit enabled: Committing " + instantTime);
-      commit(extraMetadata, result);
+      autoCommit(extraMetadata, result);
     } else {
       LOG.info("Auto commit disabled for " + instantTime);
     }
   }
 
+  protected void autoCommit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<O> result) {
+    this.txnManager.beginTransaction();
+    try {
+      // TODO : Refactor this method so we can pass a valid metadata table 
writer

Review comment:
       Removed metadata writer for now.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   public void syncTableMetadata() {
     // Open up the metadata table again, for syncing
-    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+    try {
+      HoodieTableMetadataWriter writer =

Review comment:
       Reverted

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
##########
@@ -222,6 +225,17 @@ protected void commit(Option<Map<String, String>> 
extraMetadata, HoodieWriteMeta
     LOG.info("Committing metadata bootstrap !!");
   }
 
+  @Override
+  protected void syncTableMetadata() {

Review comment:
       This is required due to `autoCommit` code in `BaseCommitActionExecutor`. 
We already have to take a lock in `BaseCommitActionExecutor` for committing the 
data, instead of taking a lock again in the write client level, I have moved 
this sync into the same critical section as commit. 

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -265,6 +266,20 @@ public void rollbackBootstrap(HoodieEngineContext context, 
String instantTime) {
     throw new HoodieNotSupportedException("Bootstrap is not supported yet");
   }
 
+  /**
+   * TODO :
+   * Refactor {@link FlinkCleanActionExecutor} to support scheduling of 
cleaning.
+   * @param context HoodieEngineContext
+   * @param instantTime Instant Time for scheduling cleaning
+   * @param extraMetadata additional metadata to write into plan
+   * @return
+   */
+  @Override
+  public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {

Review comment:
       Yeah, had a TODO on this, addressed it now.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   public void syncTableMetadata() {
     // Open up the metadata table again, for syncing
-    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+    try {
+      HoodieTableMetadataWriter writer =
+          SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, 
context);
       LOG.info("Successfully synced to metadata table");
     } catch (Exception e) {
       throw new HoodieMetadataException("Error syncing to metadata table.", e);
     }
   }
 
+  @Override
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
+    // Important to create this after the lock to ensure latest commits show 
up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);

Review comment:
       removed the metadtawriter. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an 
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(LockManager.class);
+  private final HoodieWriteConfig writeConfig;
+  private final LockConfiguration lockConfiguration;
+  private final SerializableConfiguration hadoopConf;
+  private volatile LockProvider lockProvider;
+  // Holds the latest completed write instant to know which ones to check 
conflict against
+  private final AtomicReference<Option<HoodieInstant>> 
latestCompletedWriteInstant;
+
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+    this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+  }
+
+  public void lock() {
+    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      LockProvider lockProvider = getLockProvider();
+      boolean acquired = false;
+      try {
+        int retries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+        long waitTimeInMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+        int retryCount = 0;
+        while (retryCount <= retries) {
+          acquired = 
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), 
TimeUnit.MILLISECONDS);
+          if (acquired) {
+            break;
+          }
+          LOG.info("Retrying...");
+          Thread.sleep(waitTimeInMs);
+          retryCount++;
+        }
+      } catch (Exception e) {
+        throw new HoodieLockException("Unable to acquire lock ", e);
+      }
+      if (!acquired) {
+        throw new HoodieLockException("Unable to acquire lock, lock object " + 
lockProvider.getLock());
+      }
+    }
+  }
+
+  public void unlock() {
+    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      getLockProvider().unlock();

Review comment:
       Yes, both providers underlying implementation return void but throw 
exceptions

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+    implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+                                                 Option<HoodieInstant> 
lastSuccessfulInstant) {
+
+    // To find which instants are conflicting, we apply the following logic
+    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+    // that are being newly created by the current write.
+    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+        .getCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .filterCompletedInstants()
+        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")

Review comment:
       Used `HoodieTimeline.INIT_INSTANT_TS` now

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an 
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(LockManager.class);
+  private final HoodieWriteConfig writeConfig;
+  private final LockConfiguration lockConfiguration;
+  private final SerializableConfiguration hadoopConf;
+  private volatile LockProvider lockProvider;
+  // Holds the latest completed write instant to know which ones to check 
conflict against
+  private final AtomicReference<Option<HoodieInstant>> 
latestCompletedWriteInstant;
+
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+    this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+  }
+
+  public void lock() {
+    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      LockProvider lockProvider = getLockProvider();
+      boolean acquired = false;
+      try {
+        int retries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+        long waitTimeInMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+        int retryCount = 0;
+        while (retryCount <= retries) {
+          acquired = 
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), 
TimeUnit.MILLISECONDS);
+          if (acquired) {
+            break;
+          }
+          LOG.info("Retrying...");
+          Thread.sleep(waitTimeInMs);
+          retryCount++;
+        }
+      } catch (Exception e) {

Review comment:
       If the lock was accepted by the LockProvider server but an interrupted 
exception happens then we rely on the fact that the lock will timeout after X 
mins (settings in HiveMetastore & Zookeeper). I have tested in in my production 
runs. 
   
   I have added some special checks for HiveMetastore in case of 
interruptedException but for Zookeeper it's not possible to do those checks. 
   
   ```
    acquired = 
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), 
TimeUnit.MILLISECONDS);
             if (acquired) {
               break;
             }
   ```
   
   Another extremely low probability is for the above code, lock is acquired 
but the running thread gets Interrupted before it can break. Again in this 
case, we just rely on the Lock Timeout on the server side of the LockProviders. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;

Review comment:
       ZK & Curator comes with Hbase-Server. Do you want me to add it 
explicitly in the bundles ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionUtils.class);
+
+  /**
+   * Resolve any write conflicts when committing data.
+   * @param table
+   * @param metadataWriter
+   * @param currentTxnOwnerInstant
+   * @param thisCommitMetadata
+   * @param config
+   * @param lastCompletedTxnOwnerInstant
+   * @return
+   * @throws HoodieWriteConflictException
+   */
+  public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final 
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,

Review comment:
       Replied above.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+    implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+                                                 Option<HoodieInstant> 
lastSuccessfulInstant) {
+
+    // To find which instants are conflicting, we apply the following logic
+    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+    // that are being newly created by the current write.
+    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+        .getCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .filterCompletedInstants()
+        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")
+        .getInstants();
+
+    Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+        .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+        .findInstantsAfter(currentInstant.getTimestamp())
+        .getInstants();
+    return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+    Set<String> fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+    Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+    intersection.retainAll(fileIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.error("Found conflicting writes between first operation = " + 
thisOperation
+          + ", second operation = " + otherOperation + " , intersecting file 
ids " + intersection);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> 
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter, 
HoodieTable table,
+                                              HoodieCommitOperation 
thisOperation, HoodieCommitOperation otherOperation) {
+    // NOTE that any commits from table services such as compaction, 
clustering or cleaning since the
+    // overlapping of files is handled using MVCC. Since compaction is 
eventually written as commit, we need to ensure
+    // we handle this during conflict resolution and not treat the commit from 
compaction operation as a regular commit.
+    if (otherOperation.getOperationType() == WriteOperationType.UNKNOWN

Review comment:
       Good catch, I forgot that replace commit can result from 
INSERT_OVERWRITE as well, changed the logic to be more specific. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+  private static final Logger LOG = 
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+  private CuratorFramework curatorFrameworkClient;
+  private volatile InterProcessMutex lock = null;
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final Configuration conf) {
+    this(lockConfiguration);
+    this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+        
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+        .retryPolicy(new 
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+            5000, 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+        
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_SESSION_TIMEOUT_MS))
+        
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+        .build();
+    this.curatorFrameworkClient.start();
+  }
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final CuratorFramework curatorFrameworkClient) {
+    this(lockConfiguration);
+    this.curatorFrameworkClient = curatorFrameworkClient;
+    synchronized (this.curatorFrameworkClient) {
+      if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
+        this.curatorFrameworkClient.start();
+      }
+    }
+  }
+
+  ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+    checkRequiredProps(lockConfiguration);
+    this.lockConfiguration = lockConfiguration;
+  }
+  
+  public void acquireLock(long time, TimeUnit unit) throws Exception {
+    ValidationUtils.checkArgument(this.lock == null, 
LockState.ALREADY_ACQUIRED.name());
+    InterProcessMutex newLock = new InterProcessMutex(
+        this.curatorFrameworkClient, 
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP) + "/"
+        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+    newLock.acquire(time, unit);
+    if (newLock.isAcquiredInThisProcess()) {
+      lock = newLock;
+    }
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+    LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
+    try {
+      acquireLock(time, unit);
+      LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
+    } catch (Exception e) {
+      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);

Review comment:
       This is not the code on my local. Something amiss happened during rebase 
and squash commits on my local. I have re-done the merge to ensure nothing got 
lost. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+    implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+                                                 Option<HoodieInstant> 
lastSuccessfulInstant) {
+
+    // To find which instants are conflicting, we apply the following logic
+    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+    // that are being newly created by the current write.
+    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+        .getCommitsTimeline()
+        // TODO : getWriteTimeline to ensure we include replace commits as well
+        .filterCompletedInstants()
+        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")
+        .getInstants();
+
+    Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+        .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+        .findInstantsAfter(currentInstant.getTimestamp())
+        .getInstants();
+    return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+    Set<String> fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+    Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+    intersection.retainAll(fileIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.error("Found conflicting writes between first operation = " + 
thisOperation

Review comment:
       I can make it `WARN`. This is useful to debug issues and INFO might get 
ignored.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import 
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;

Review comment:
       These configs are cross used for `HiveMetastoreLockProvider` which does 
not depend on hudi-client-common. Hence these are in `hudi-common`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and 
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
+
+  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
+    HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
+    archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+    archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+    switch (hoodieInstant.getAction()) {
+      case HoodieTimeline.CLEAN_ACTION: {
+        if (hoodieInstant.isCompleted()) {
+          
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 hoodieInstant));
+        } else {
+          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 hoodieInstant));
+        }
+        archivedMetaWrapper.setActionType(ActionType.clean.name());
+        break;
+      }
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.DELTA_COMMIT_ACTION: {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+        archivedMetaWrapper.setActionType(ActionType.commit.name());
+        break;
+      }
+      case HoodieTimeline.REPLACE_COMMIT_ACTION: {
+        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
+                
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieReplaceCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
+        archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+        break;
+      }
+      case HoodieTimeline.ROLLBACK_ACTION: {
+        
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+                
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(), 
HoodieRollbackMetadata.class));
+        archivedMetaWrapper.setActionType(ActionType.rollback.name());
+        break;
+      }
+      case HoodieTimeline.SAVEPOINT_ACTION: {
+        
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+                
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(), 
HoodieSavepointMetadata.class));
+        archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+        break;
+      }
+      case HoodieTimeline.COMPACTION_ACTION: {
+        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
+        archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        archivedMetaWrapper.setActionType(ActionType.compaction.name());
+        break;
+      }
+      default: {
+        throw new UnsupportedOperationException("Action not fully supported 
yet");
+      }
+    }
+    return archivedMetaWrapper;
+  }
+
+  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant,
+                                                          HoodieCommitMetadata 
hoodieCommitMetadata) {
+    HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
+    archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+    archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+    
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(hoodieCommitMetadata));
+    archivedMetaWrapper.setActionType(ActionType.commit.name());
+    return archivedMetaWrapper;
+  }
+
+  public static org.apache.hudi.avro.model.HoodieCommitMetadata 
convertCommitMetadata(
+          HoodieCommitMetadata hoodieCommitMetadata) {
+    ObjectMapper mapper = new ObjectMapper();
+    // Need this to ignore other public get() methods
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData =
+            mapper.convertValue(hoodieCommitMetadata, 
org.apache.hudi.avro.model.HoodieCommitMetadata.class);
+    // Do not archive Rolling Stats, cannot set to null since AVRO will throw 
null pointer
+    
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
 "");
+    return avroMetaData;
+  }
+
+  // TODO : Fix converting from SpecificRecord to POJO

Review comment:
       I've removed the method for now to be sure no one uses it. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -43,7 +43,8 @@
   public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
   public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
   public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
-
+  // Turn on inline cleaning
+  public static final String INLINE_CLEAN_PROP = "hoodie.clean.inline";

Review comment:
       I was trying to keep the same concepts of `inline` for clustering, 
compact. The problem is there are autoClean & autoCommit but no `autoCompact` 
or `autoCluster` etc. Additionally, we used `inline` as the flag to toggle 
between inline & async for compact & cluster while we have chosen `async` as 
the flag for clean. 
   
   I have removed `hoodie.clean.inline`. We can address these in another PR 
after we decide what convention to follow. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -114,6 +115,7 @@
       HoodieFailedWritesCleaningPolicy.EAGER.name();
   private static final String DEFAULT_AUTO_CLEAN = "true";
   private static final String DEFAULT_ASYNC_CLEAN = "false";
+  private static final String DEFAULT_INLINE_CLEAN = DEFAULT_AUTO_CLEAN;

Review comment:
       done

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Configuration for managing locks. Since this configuration needs to be 
shared with HiveMetaStore based lock,
+ * which is in a different package than other lock providers, we use this as a 
data transfer object in hoodie-common
+ */
+public class LockConfiguration implements Serializable {
+
+  public static final String LOCK_PREFIX = "hoodie.writer.lock.";
+  public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = 
LOCK_PREFIX + "wait_time_ms_between_retry";
+  public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = 
String.valueOf(5000L);
+  public static final String 
LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + 
"client.wait_time_ms_between_retry";
+  public static final String 
DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(10000L);
+  public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_PREFIX + 
"num_retries";
+  public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = 
String.valueOf(3);
+  public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP = 
LOCK_PREFIX + "client.num_retries";
+  public static final String DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES = 
String.valueOf(0);
+  public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = LOCK_PREFIX + 
"wait_time_ms";
+  public static final int DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS = 60 * 1000;
+  // configs for file system based locks. NOTE: This only works for DFS with 
atomic create/delete operation
+  public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX = 
LOCK_PREFIX + "filesystem.";
+  public static final String FILESYSTEM_LOCK_PATH_PROP = 
FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
+  // configs for metastore based locks
+  public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX 
+ "hivemetastore.";
+  public static final String HIVE_DATABASE_NAME_PROP = 
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "database";
+  public static final String HIVE_TABLE_NAME_PROP = 
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "table";
+  // Zookeeper configs for zk based locks
+  public static final String ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX = 
LOCK_PREFIX + "zookeeper.";
+  public static final String ZK_BASE_PATH_PROP = 
ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_base_path";

Review comment:
       This is the base path for the zk lock which users can select. I have not 
exposed a config to change the chroot. This will be the default.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -66,6 +70,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig c
     this.operationType = operationType;
     this.extraMetadata = extraMetadata;
     this.taskContextSupplier = context.getTaskContextSupplier();
+    this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
+    // TODO : Remove this once we refactor and move out autoCommit method from 
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.

Review comment:
       https://issues.apache.org/jira/browse/HUDI-1665

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+  private static final Logger LOG = 
LogManager.getLogger(TransactionUtils.class);
+
+  /**
+   * Resolve any write conflicts when committing data.
+   * @param table
+   * @param metadataWriter
+   * @param currentTxnOwnerInstant
+   * @param thisCommitMetadata
+   * @param config
+   * @param lastCompletedTxnOwnerInstant
+   * @return
+   * @throws HoodieWriteConflictException
+   */
+  public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final 
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
+                                                                       final 
Option<HoodieInstant> currentTxnOwnerInstant, final 
Option<HoodieCommitMetadata> thisCommitMetadata,
+                                                                       final 
HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant)
+      throws HoodieWriteConflictException {
+    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      ConflictResolutionStrategy resolutionStrategy = 
config.getWriteConflictResolutionStrategy();
+      Stream<HoodieInstant> instantStream = 
resolutionStrategy.getInstantsStream(table.getActiveTimeline(), 
currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
+      // TODO : metadataWriter.reload() inside resolve write conflict ??

Review comment:
       I put this TODO since we are going to need a way to use the 
MetadataWriter to manipulate any concurrent actions performed, we will address 
this use-case in a follow up PR. I have removed metadata writer for now.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider<T> implements Lock, AutoCloseable {

Review comment:
       I want to mark some methods as not implementable which is why abstract 
class is chosen

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -82,6 +84,10 @@ public static WriteOperationType fromValue(String value) {
         return INSERT_OVERWRITE_TABLE;
       case "cluster":
         return CLUSTER;
+      case "compact":
+        return COMPACT;
+      case "unknown":

Review comment:
       This was introduced to address older metadata when we don't store the 
WriteOperationType in the metadata..

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  // Only a single writer can perform write ops
+  SINGLE_WRITER("single_writer"),
+  // Multiple writer can perform write ops with lazy conflict resolution using 
locks
+  OPTIMISTIC_CONCURRENCY_CONTROL("optimistic_concurrency_control");
+
+  private final String value;
+
+  WriteConcurrencyMode(String value) {
+    this.value = value;
+  }
+
+  /**
+   * Getter for write concurrency mode.
+   * @return
+   */
+  public String value() {
+    return value;
+  }
+
+  /**
+   * Convert string value to WriteConcurrencyMode.
+   */
+  public static WriteConcurrencyMode fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "single_writer":
+        return SINGLE_WRITER;
+      case "optimistic_concurrency_control":
+        return OPTIMISTIC_CONCURRENCY_CONTROL;
+      default:
+        throw new HoodieException("Invalid value of Type.");
+    }
+  }
+
+  public boolean supportsOptimisticConcurrencyControl() {

Review comment:
       I feel `supportsOptimisticConcurrencyControl` as it is more direct vs 
`isOptimisticConcurrencyControl` which sounds a little weird. Let me know if 
you have a strong preference.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, 
String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+    if (HoodieTableVersion.current() != 
metaClient.getTableConfig().getTableVersion()) {

Review comment:
       Added

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, 
String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+    if (HoodieTableVersion.current() != 
metaClient.getTableConfig().getTableVersion()) {
+      // TODO : Force clean up of all inflights, do this once pending rollback 
removal PR is landed
+      // this.rollbackFailedWrites();
+      this.txnManager.beginTransaction();
+      try {
+        // Ensure no inflight commits
+        TransactionUtils.resolveConflictIfAnyForUpgradeDowngrade(metaClient);
+        new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
+      } finally {
+        this.txnManager.endTransaction();
+      }
+    }
+    return getTableAndInitCtx(metaClient, operationType, instantTime);
   }
 
-  private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, 
WriteOperationType operationType) {
+  // TODO : To enforce priority between table service and ingestion writer, 
use transactions here and invoke strategy

Review comment:
       So basically the following is a use-case for us in production : 
   
   1) Writer starts to write fresh data to files f1,f2, c1 is inflight
   2) Schedule clustering, c2.cluster for files f1,f2
   3) c1 and c2 in progress
   4) c2.cluster finishes
   5) c1 attempts to finish and notices that c2 has overlapping file ids and 
aborts
   
   We want to override the priority of c1 over c2 to avoid violating freshness 
SLA. A design and PR for this is going to follow after this PR is landed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to