n3nash commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r587777089
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -30,16 +31,19 @@
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableService;
Review comment:
I kept it here because WriteOperationType is also here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I
preppedRecords, final String instan
* Common method containing steps to be performed before write
(upsert/insert/...
* @param instantTime
* @param writeOperationType
+ * @param metaClient
*/
- protected void preWrite(String instantTime, WriteOperationType
writeOperationType) {
+ protected void preWrite(String instantTime, WriteOperationType
writeOperationType,
+ HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- syncTableMetadata();
+
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+ .lastInstant());
+ LOG.info("Last Instant Cached by writer with instant " + instantTime + "
is " + this.txnManager.getLastCompletedTransactionOwner());
+ this.txnManager.setTransactionOwner(Option.of(new
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+ this.txnManager.beginTransaction();
Review comment:
I also realized this during implementation but wanted to keep
`beginTransaction(..)` API simple. I've added a overridden method now
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+ /**
+ * Stream of instants to check conflicts against.
+ * @return
+ */
+ Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline,
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+
+ /**
+ * Implementations of this method will determine whether a conflict exists
between 2 commits.
+ * @param thisOperation
+ * @param otherOperation
+ * @return
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation);
+
+ /**
+ * Implementations of this method will determine how to resolve a conflict
between 2 commits.
+ * @param thisOperation
+ * @param otherOperation
+ * @return
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<HoodieCommitMetadata>
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter,
HoodieTable table,
Review comment:
So this is being passed to allow for the metadata to be manipulated to
do some kind of conflict resolution. Right now it's not being used anywhere, I
can remove it but will need to be added sometime soon when we need to do
conflict resolution more than just throwing exception.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/HoodieCommitOperation.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is used to hold all information used to identify how to resolve
conflicts between instants.
+ * Since we interchange payload types between AVRO specific records and
POJO's, this object serves as
+ * a common payload to manage these conversions.
+ */
+public class HoodieCommitOperation {
Review comment:
So this is just wrap the `CommitMetadata` to a common payload.
`ConflictingOperation` suggests this is already a conflicting operation which
it is not yet. Open to other suggestions if you have
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+@NotThreadSafe
Review comment:
The synchronized method is just for testing to start the ZK else
parallel tests end up triggering start multiple times and that causes issues..
Since we don't use @VisibleTesting, I've put a comment
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
Review comment:
Added one now
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+
+/**
+ * Hoodie Configs for Locks.
+ */
+public class HoodieLockConfig extends DefaultHoodieConfig {
+
+ // Pluggable type of lock provider
+ public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PREFIX +
"provider";
+ public static final String DEFAULT_LOCK_PROVIDER_CLASS =
ZookeeperBasedLockProvider.class.getName();
+ // Pluggable strategies to use when resolving conflicts
+ public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP =
+ LOCK_PREFIX + "conflict.resolution.strategy";
+ public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS =
+ SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName();
+
+ private HoodieLockConfig(Properties props) {
+ super(props);
+ }
+
+ public static HoodieLockConfig.Builder newBuilder() {
+ return new HoodieLockConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public HoodieLockConfig.Builder fromFile(File propertiesFile) throws
IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.props.load(reader);
+ return this;
+ }
+ }
+
+ public HoodieLockConfig.Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withLockProvider(Class<? extends
LockProvider> lockProvider) {
+ props.setProperty(LOCK_PROVIDER_CLASS_PROP, lockProvider.getName());
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withHiveDatabaseName(String databaseName) {
+ props.setProperty(HIVE_DATABASE_NAME_PROP, databaseName);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withHiveTableName(String tableName) {
+ props.setProperty(HIVE_TABLE_NAME_PROP, tableName);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
+ props.setProperty(ZK_CONNECT_URL_PROP, zkQuorum);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkBasePath(String zkBasePath) {
+ props.setProperty(ZK_BASE_PATH_PROP, zkBasePath);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkPort(String zkPort) {
+ props.setProperty(ZK_PORT_PROP, zkPort);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkLockKey(String zkLockKey) {
+ props.setProperty(ZK_LOCK_KEY_PROP, zkLockKey);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkConnectionTimeoutInMs(Long
connectionTimeoutInMs) {
+ props.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP,
String.valueOf(connectionTimeoutInMs));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkSessionTimeoutInMs(Long
sessionTimeoutInMs) {
+ props.setProperty(ZK_SESSION_TIMEOUT_MS_PROP,
String.valueOf(sessionTimeoutInMs));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withNumRetries(int numRetries) {
+ props.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP,
String.valueOf(numRetries));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withRetryWaitTimeInMillis(Long
retryWaitTimeInMillis) {
+ props.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP,
String.valueOf(retryWaitTimeInMillis));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries)
{
+ props.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP,
String.valueOf(clientNumRetries));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withClientRetryWaitTimeInMillis(Long
clientRetryWaitTimeInMillis) {
+ props.setProperty(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP,
String.valueOf(clientRetryWaitTimeInMillis));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withLockWaitTimeInMillis(Long
waitTimeInMillis) {
+ props.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP,
String.valueOf(waitTimeInMillis));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder
withConflictResolutionStrategy(ConflictResolutionStrategy
conflictResolutionStrategy) {
+ props.setProperty(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP,
conflictResolutionStrategy.getClass().getName());
+ return this;
+ }
+
+ public HoodieLockConfig build() {
Review comment:
checked
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class BaseCleanPlanActionExecutor<T extends
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O,
Option<HoodieCleanerPlan>> {
+
+ private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
+
+ private final Option<Map<String, String>> extraMetadata;
+
+ public BaseCleanPlanActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ Option<Map<String, String>>
extraMetadata) {
+ super(context, config, table, instantTime);
+ this.extraMetadata = extraMetadata;
+ }
+
+ protected abstract Option<HoodieCleanerPlan> createCleanerPlan();
+
+ /**
+ * Generates List of files to be cleaned.
+ *
+ * @param context HoodieEngineContext
+ * @return Cleaner Plan
+ */
+ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
Review comment:
Yes, no change.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -195,30 +126,24 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
@Override
public HoodieCleanMetadata execute() {
+ List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
// If there are inflight(failed) or previously requested clean operation,
first perform them
List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (pendingCleanInstants.size() > 0) {
pendingCleanInstants.forEach(hoodieInstant -> {
LOG.info("Finishing previously unfinished cleaner instant=" +
hoodieInstant);
try {
- runPendingClean(table, hoodieInstant);
+ cleanMetadataList.add(runPendingClean(table, hoodieInstant));
} catch (Exception e) {
LOG.warn("Failed to perform previous clean operation, instant: " +
hoodieInstant, e);
}
});
table.getMetaClient().reloadActiveTimeline();
}
-
- // Plan and execute a new clean action
- Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
- if (cleanerPlanOpt.isPresent()) {
- table.getMetaClient().reloadActiveTimeline();
- HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
- if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) &&
!cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
- return runClean(table,
HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
- }
- }
- return null;
+ // return the last clean metadata for now
+ // TODO (NA) : Clean only the earliest pending clean just like how we do
for other table services
Review comment:
Currently, the clean metadata from `runPendingClean` is never returned
if you see above in line 205. The current logic is as follows
1) For all pending clean operations, we just return `null`.
2) If there is new clean to be done, we do the new clean and return the
metadata.
Clean metadata is always persisted before this method inside the `runClean`
method and the return value of this method is NOT used by the client.
I made above changes to reuse the same methods and to keep the same
behavior, except with one change :
1) For all pending clean operations, we return the latest pending clean from
previous runs.
2) If there is new clean to be done, we do the new clean and return the
metadata.
Other logic remains the same. The returned metadata is ONLY used for a)
Logging b) Metrics.
Let's refactor all these issues in the ActionExecutor in follow up PR. Filed
issue here -> https://issues.apache.org/jira/browse/HUDI-1666
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I
preppedRecords, final String instan
* Common method containing steps to be performed before write
(upsert/insert/...
* @param instantTime
* @param writeOperationType
+ * @param metaClient
*/
- protected void preWrite(String instantTime, WriteOperationType
writeOperationType) {
+ protected void preWrite(String instantTime, WriteOperationType
writeOperationType,
+ HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- syncTableMetadata();
+
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+ .lastInstant());
+ LOG.info("Last Instant Cached by writer with instant " + instantTime + "
is " + this.txnManager.getLastCompletedTransactionOwner());
+ this.txnManager.setTransactionOwner(Option.of(new
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+ this.txnManager.beginTransaction();
+ try {
+ syncTableMetadata();
+ } finally {
+ this.txnManager.endTransaction();
Review comment:
Right now, `endTransaction()` is doing the job or end & abort - no
difference in behavior. Both ensure that if lock was acquired release and
cleanup other state.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -117,12 +126,24 @@ protected String getCommitActionType() {
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
- commit(extraMetadata, result);
+ autoCommit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
+ protected void autoCommit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<O> result) {
+ this.txnManager.beginTransaction();
+ try {
+ // TODO : Refactor this method so we can pass a valid metadata table
writer
Review comment:
Removed metadata writer for now.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+ try {
+ HoodieTableMetadataWriter writer =
Review comment:
Reverted
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
##########
@@ -222,6 +225,17 @@ protected void commit(Option<Map<String, String>>
extraMetadata, HoodieWriteMeta
LOG.info("Committing metadata bootstrap !!");
}
+ @Override
+ protected void syncTableMetadata() {
Review comment:
This is required due to `autoCommit` code in `BaseCommitActionExecutor`.
We already have to take a lock in `BaseCommitActionExecutor` for committing the
data, instead of taking a lock again in the write client level, I have moved
this sync into the same critical section as commit.
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -265,6 +266,20 @@ public void rollbackBootstrap(HoodieEngineContext context,
String instantTime) {
throw new HoodieNotSupportedException("Bootstrap is not supported yet");
}
+ /**
+ * TODO :
+ * Refactor {@link FlinkCleanActionExecutor} to support scheduling of
cleaning.
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for scheduling cleaning
+ * @param extraMetadata additional metadata to write into plan
+ * @return
+ */
+ @Override
+ public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
Review comment:
Yeah, had a TODO on this, addressed it now.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+ try {
+ HoodieTableMetadataWriter writer =
+ SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config,
context);
LOG.info("Successfully synced to metadata table");
} catch (Exception e) {
throw new HoodieMetadataException("Error syncing to metadata table.", e);
}
}
+ @Override
+ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
+ // Important to create this after the lock to ensure latest commits show
up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
Review comment:
removed the metadtawriter.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(LockManager.class);
+ private final HoodieWriteConfig writeConfig;
+ private final LockConfiguration lockConfiguration;
+ private final SerializableConfiguration hadoopConf;
+ private volatile LockProvider lockProvider;
+ // Holds the latest completed write instant to know which ones to check
conflict against
+ private final AtomicReference<Option<HoodieInstant>>
latestCompletedWriteInstant;
+
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+ this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+ }
+
+ public void lock() {
+ if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ LockProvider lockProvider = getLockProvider();
+ boolean acquired = false;
+ try {
+ int retries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+ long waitTimeInMs =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+ int retryCount = 0;
+ while (retryCount <= retries) {
+ acquired =
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(),
TimeUnit.MILLISECONDS);
+ if (acquired) {
+ break;
+ }
+ LOG.info("Retrying...");
+ Thread.sleep(waitTimeInMs);
+ retryCount++;
+ }
+ } catch (Exception e) {
+ throw new HoodieLockException("Unable to acquire lock ", e);
+ }
+ if (!acquired) {
+ throw new HoodieLockException("Unable to acquire lock, lock object " +
lockProvider.getLock());
+ }
+ }
+ }
+
+ public void unlock() {
+ if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ getLockProvider().unlock();
Review comment:
Yes, both providers underlying implementation return void but throw
exceptions
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant) {
+
+ // To find which instants are conflicting, we apply the following logic
+ // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
+ // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
+ // after the current instant. We need to check for write conflicts since
they may have mutated the same files
+ // that are being newly created by the current write.
+ Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+ .getCommitsTimeline()
+ // TODO : getWriteTimeline to ensure we include replace commits as well
+ .filterCompletedInstants()
+ .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : "0")
Review comment:
Used `HoodieTimeline.INIT_INSTANT_TS` now
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(LockManager.class);
+ private final HoodieWriteConfig writeConfig;
+ private final LockConfiguration lockConfiguration;
+ private final SerializableConfiguration hadoopConf;
+ private volatile LockProvider lockProvider;
+ // Holds the latest completed write instant to know which ones to check
conflict against
+ private final AtomicReference<Option<HoodieInstant>>
latestCompletedWriteInstant;
+
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+ this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+ }
+
+ public void lock() {
+ if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ LockProvider lockProvider = getLockProvider();
+ boolean acquired = false;
+ try {
+ int retries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+ long waitTimeInMs =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+ int retryCount = 0;
+ while (retryCount <= retries) {
+ acquired =
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(),
TimeUnit.MILLISECONDS);
+ if (acquired) {
+ break;
+ }
+ LOG.info("Retrying...");
+ Thread.sleep(waitTimeInMs);
+ retryCount++;
+ }
+ } catch (Exception e) {
Review comment:
If the lock was accepted by the LockProvider server but an interrupted
exception happens then we rely on the fact that the lock will timeout after X
mins (settings in HiveMetastore & Zookeeper). I have tested in in my production
runs.
I have added some special checks for HiveMetastore in case of
interruptedException but for Zookeeper it's not possible to do those checks.
```
acquired =
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(),
TimeUnit.MILLISECONDS);
if (acquired) {
break;
}
```
Another extremely low probability is for the above code, lock is acquired
but the running thread gets Interrupted before it can break. Again in this
case, we just rely on the Lock Timeout on the server side of the LockProviders.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
Review comment:
ZK & Curator comes with Hbase-Server. Do you want me to add it
explicitly in the bundles ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionUtils.class);
+
+ /**
+ * Resolve any write conflicts when committing data.
+ * @param table
+ * @param metadataWriter
+ * @param currentTxnOwnerInstant
+ * @param thisCommitMetadata
+ * @param config
+ * @param lastCompletedTxnOwnerInstant
+ * @return
+ * @throws HoodieWriteConflictException
+ */
+ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
Review comment:
Replied above.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant) {
+
+ // To find which instants are conflicting, we apply the following logic
+ // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
+ // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
+ // after the current instant. We need to check for write conflicts since
they may have mutated the same files
+ // that are being newly created by the current write.
+ Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+ .getCommitsTimeline()
+ // TODO : getWriteTimeline to ensure we include replace commits as well
+ .filterCompletedInstants()
+ .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : "0")
+ .getInstants();
+
+ Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION,
COMPACTION_ACTION))
+ .findInstantsAfter(currentInstant.getTimestamp())
+ .getInstants();
+ return Stream.concat(completedCommitsInstantStream,
compactionAndClusteringTimeline);
+ }
+
+ @Override
+ public boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation) {
+ // TODO : UUID's can clash even for insert/insert, handle that case.
+ Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+ Set<String> fileIdsSetForSecondInstant =
otherOperation.getMutatedFileIds();
+ Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+ intersection.retainAll(fileIdsSetForSecondInstant);
+ if (!intersection.isEmpty()) {
+ LOG.error("Found conflicting writes between first operation = " +
thisOperation
+ + ", second operation = " + otherOperation + " , intersecting file
ids " + intersection);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata>
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter,
HoodieTable table,
+ HoodieCommitOperation
thisOperation, HoodieCommitOperation otherOperation) {
+ // NOTE that any commits from table services such as compaction,
clustering or cleaning since the
+ // overlapping of files is handled using MVCC. Since compaction is
eventually written as commit, we need to ensure
+ // we handle this during conflict resolution and not treat the commit from
compaction operation as a regular commit.
+ if (otherOperation.getOperationType() == WriteOperationType.UNKNOWN
Review comment:
Good catch, I forgot that replace commit can result from
INSERT_OVERWRITE as well, changed the logic to be more specific.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+ private static final Logger LOG =
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+ private CuratorFramework curatorFrameworkClient;
+ private volatile InterProcessMutex lock = null;
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+ .retryPolicy(new
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+ 5000,
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
DEFAULT_ZK_SESSION_TIMEOUT_MS))
+
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+ .build();
+ this.curatorFrameworkClient.start();
+ }
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final CuratorFramework curatorFrameworkClient) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = curatorFrameworkClient;
+ synchronized (this.curatorFrameworkClient) {
+ if (this.curatorFrameworkClient.getState() !=
CuratorFrameworkState.STARTED) {
+ this.curatorFrameworkClient.start();
+ }
+ }
+ }
+
+ ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ }
+
+ public void acquireLock(long time, TimeUnit unit) throws Exception {
+ ValidationUtils.checkArgument(this.lock == null,
LockState.ALREADY_ACQUIRED.name());
+ InterProcessMutex newLock = new InterProcessMutex(
+ this.curatorFrameworkClient,
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP) + "/"
+ + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+ newLock.acquire(time, unit);
+ if (newLock.isAcquiredInThisProcess()) {
+ lock = newLock;
+ }
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ try {
+ acquireLock(time, unit);
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
Review comment:
This is not the code on my local. Something amiss happened during rebase
and squash commits on my local. I have re-done the merge to ensure nothing got
lost.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant) {
+
+ // To find which instants are conflicting, we apply the following logic
+ // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
+ // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
+ // after the current instant. We need to check for write conflicts since
they may have mutated the same files
+ // that are being newly created by the current write.
+ Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+ .getCommitsTimeline()
+ // TODO : getWriteTimeline to ensure we include replace commits as well
+ .filterCompletedInstants()
+ .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : "0")
+ .getInstants();
+
+ Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION,
COMPACTION_ACTION))
+ .findInstantsAfter(currentInstant.getTimestamp())
+ .getInstants();
+ return Stream.concat(completedCommitsInstantStream,
compactionAndClusteringTimeline);
+ }
+
+ @Override
+ public boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation) {
+ // TODO : UUID's can clash even for insert/insert, handle that case.
+ Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+ Set<String> fileIdsSetForSecondInstant =
otherOperation.getMutatedFileIds();
+ Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+ intersection.retainAll(fileIdsSetForSecondInstant);
+ if (!intersection.isEmpty()) {
+ LOG.error("Found conflicting writes between first operation = " +
thisOperation
Review comment:
I can make it `WARN`. This is useful to debug issues and INFO might get
ignored.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
Review comment:
These configs are cross used for `HiveMetastoreLockProvider` which does
not depend on hudi-client-common. Hence these are in `hudi-common`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
+
+ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
+ HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
+ archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+ archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
hoodieInstant));
+ } else {
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
hoodieInstant));
+ }
+ archivedMetaWrapper.setActionType(ActionType.clean.name());
+ break;
+ }
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION: {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+ archivedMetaWrapper.setActionType(ActionType.commit.name());
+ break;
+ }
+ case HoodieTimeline.REPLACE_COMMIT_ACTION: {
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
+
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
+
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
+ archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+ break;
+ }
+ case HoodieTimeline.ROLLBACK_ACTION: {
+
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieRollbackMetadata.class));
+ archivedMetaWrapper.setActionType(ActionType.rollback.name());
+ break;
+ }
+ case HoodieTimeline.SAVEPOINT_ACTION: {
+
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieSavepointMetadata.class));
+ archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+ break;
+ }
+ case HoodieTimeline.COMPACTION_ACTION: {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ archivedMetaWrapper.setActionType(ActionType.compaction.name());
+ break;
+ }
+ default: {
+ throw new UnsupportedOperationException("Action not fully supported
yet");
+ }
+ }
+ return archivedMetaWrapper;
+ }
+
+ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant,
+ HoodieCommitMetadata
hoodieCommitMetadata) {
+ HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
+ archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+ archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(hoodieCommitMetadata));
+ archivedMetaWrapper.setActionType(ActionType.commit.name());
+ return archivedMetaWrapper;
+ }
+
+ public static org.apache.hudi.avro.model.HoodieCommitMetadata
convertCommitMetadata(
+ HoodieCommitMetadata hoodieCommitMetadata) {
+ ObjectMapper mapper = new ObjectMapper();
+ // Need this to ignore other public get() methods
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData =
+ mapper.convertValue(hoodieCommitMetadata,
org.apache.hudi.avro.model.HoodieCommitMetadata.class);
+ // Do not archive Rolling Stats, cannot set to null since AVRO will throw
null pointer
+
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
"");
+ return avroMetaData;
+ }
+
+ // TODO : Fix converting from SpecificRecord to POJO
Review comment:
I've removed the method for now to be sure no one uses it.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -43,7 +43,8 @@
public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
-
+ // Turn on inline cleaning
+ public static final String INLINE_CLEAN_PROP = "hoodie.clean.inline";
Review comment:
I was trying to keep the same concepts of `inline` for clustering,
compact. The problem is there are autoClean & autoCommit but no `autoCompact`
or `autoCluster` etc. Additionally, we used `inline` as the flag to toggle
between inline & async for compact & cluster while we have chosen `async` as
the flag for clean.
I have removed `hoodie.clean.inline`. We can address these in another PR
after we decide what convention to follow.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -114,6 +115,7 @@
HoodieFailedWritesCleaningPolicy.EAGER.name();
private static final String DEFAULT_AUTO_CLEAN = "true";
private static final String DEFAULT_ASYNC_CLEAN = "false";
+ private static final String DEFAULT_INLINE_CLEAN = DEFAULT_AUTO_CLEAN;
Review comment:
done
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Configuration for managing locks. Since this configuration needs to be
shared with HiveMetaStore based lock,
+ * which is in a different package than other lock providers, we use this as a
data transfer object in hoodie-common
+ */
+public class LockConfiguration implements Serializable {
+
+ public static final String LOCK_PREFIX = "hoodie.writer.lock.";
+ public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP =
LOCK_PREFIX + "wait_time_ms_between_retry";
+ public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS =
String.valueOf(5000L);
+ public static final String
LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX +
"client.wait_time_ms_between_retry";
+ public static final String
DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(10000L);
+ public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_PREFIX +
"num_retries";
+ public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES =
String.valueOf(3);
+ public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP =
LOCK_PREFIX + "client.num_retries";
+ public static final String DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES =
String.valueOf(0);
+ public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = LOCK_PREFIX +
"wait_time_ms";
+ public static final int DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS = 60 * 1000;
+ // configs for file system based locks. NOTE: This only works for DFS with
atomic create/delete operation
+ public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX =
LOCK_PREFIX + "filesystem.";
+ public static final String FILESYSTEM_LOCK_PATH_PROP =
FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
+ // configs for metastore based locks
+ public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX
+ "hivemetastore.";
+ public static final String HIVE_DATABASE_NAME_PROP =
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "database";
+ public static final String HIVE_TABLE_NAME_PROP =
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "table";
+ // Zookeeper configs for zk based locks
+ public static final String ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX =
LOCK_PREFIX + "zookeeper.";
+ public static final String ZK_BASE_PATH_PROP =
ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_base_path";
Review comment:
This is the base path for the zk lock which users can select. I have not
exposed a config to change the chroot. This will be the default.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -66,6 +70,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
this.operationType = operationType;
this.extraMetadata = extraMetadata;
this.taskContextSupplier = context.getTaskContextSupplier();
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ // TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.
Review comment:
https://issues.apache.org/jira/browse/HUDI-1665
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionUtils.class);
+
+ /**
+ * Resolve any write conflicts when committing data.
+ * @param table
+ * @param metadataWriter
+ * @param currentTxnOwnerInstant
+ * @param thisCommitMetadata
+ * @param config
+ * @param lastCompletedTxnOwnerInstant
+ * @return
+ * @throws HoodieWriteConflictException
+ */
+ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
+ final
Option<HoodieInstant> currentTxnOwnerInstant, final
Option<HoodieCommitMetadata> thisCommitMetadata,
+ final
HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant)
+ throws HoodieWriteConflictException {
+ if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ ConflictResolutionStrategy resolutionStrategy =
config.getWriteConflictResolutionStrategy();
+ Stream<HoodieInstant> instantStream =
resolutionStrategy.getInstantsStream(table.getActiveTimeline(),
currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
+ // TODO : metadataWriter.reload() inside resolve write conflict ??
Review comment:
I put this TODO since we are going to need a way to use the
MetadataWriter to manipulate any concurrent actions performed, we will address
this use-case in a follow up PR. I have removed metadata writer for now.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider<T> implements Lock, AutoCloseable {
Review comment:
I want to mark some methods as not implementable which is why abstract
class is chosen
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -82,6 +84,10 @@ public static WriteOperationType fromValue(String value) {
return INSERT_OVERWRITE_TABLE;
case "cluster":
return CLUSTER;
+ case "compact":
+ return COMPACT;
+ case "unknown":
Review comment:
This was introduced to address older metadata when we don't store the
WriteOperationType in the metadata..
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+ // Only a single writer can perform write ops
+ SINGLE_WRITER("single_writer"),
+ // Multiple writer can perform write ops with lazy conflict resolution using
locks
+ OPTIMISTIC_CONCURRENCY_CONTROL("optimistic_concurrency_control");
+
+ private final String value;
+
+ WriteConcurrencyMode(String value) {
+ this.value = value;
+ }
+
+ /**
+ * Getter for write concurrency mode.
+ * @return
+ */
+ public String value() {
+ return value;
+ }
+
+ /**
+ * Convert string value to WriteConcurrencyMode.
+ */
+ public static WriteConcurrencyMode fromValue(String value) {
+ switch (value.toLowerCase(Locale.ROOT)) {
+ case "single_writer":
+ return SINGLE_WRITER;
+ case "optimistic_concurrency_control":
+ return OPTIMISTIC_CONCURRENCY_CONTROL;
+ default:
+ throw new HoodieException("Invalid value of Type.");
+ }
+ }
+
+ public boolean supportsOptimisticConcurrencyControl() {
Review comment:
I feel `supportsOptimisticConcurrencyControl` as it is more direct vs
`isOptimisticConcurrencyControl` which sounds a little weird. Let me know if
you have a strong preference.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ if (HoodieTableVersion.current() !=
metaClient.getTableConfig().getTableVersion()) {
Review comment:
Added
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ if (HoodieTableVersion.current() !=
metaClient.getTableConfig().getTableVersion()) {
+ // TODO : Force clean up of all inflights, do this once pending rollback
removal PR is landed
+ // this.rollbackFailedWrites();
+ this.txnManager.beginTransaction();
+ try {
+ // Ensure no inflight commits
+ TransactionUtils.resolveConflictIfAnyForUpgradeDowngrade(metaClient);
+ new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
+ } finally {
+ this.txnManager.endTransaction();
+ }
+ }
+ return getTableAndInitCtx(metaClient, operationType, instantTime);
}
- private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient,
WriteOperationType operationType) {
+ // TODO : To enforce priority between table service and ingestion writer,
use transactions here and invoke strategy
Review comment:
So basically the following is a use-case for us in production :
1) Writer starts to write fresh data to files f1,f2, c1 is inflight
2) Schedule clustering, c2.cluster for files f1,f2
3) c1 and c2 in progress
4) c2.cluster finishes
5) c1 attempts to finish and notices that c2 has overlapping file ids and
aborts
We want to override the priority of c1 over c2 to avoid violating freshness
SLA. A design and PR for this is going to follow after this PR is landed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]