vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r587049910
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -30,16 +31,19 @@
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
Review comment:
`lock` as a package name feels off to me. Can we have
`org.apache.hudi.client.transaction.TransactionManager`?
Then `.lock` can be a sub package under i.e `.transaction.lock.`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -30,16 +31,19 @@
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableService;
Review comment:
So, `model` package should just contain pojos i.e data structure
objects. Lets move `TableService` elsewhere
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -210,6 +231,11 @@ void emitCommitMetrics(String instantTime,
HoodieCommitMetadata metadata, String
}
}
+ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+ // no-op
+ // TODO : Conflict resolution is not support for Flink,Java engines
Review comment:
typo: not supported
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -797,7 +853,9 @@ public Boolean rollbackFailedWrites() {
* Performs a compaction operation on a table, serially before or after an
insert/upsert action.
*/
protected Option<String> inlineCompact(Option<Map<String, String>>
extraMetadata) {
- Option<String> compactionInstantTimeOpt =
scheduleCompaction(extraMetadata);
+ String schedulingCompactionInstant =
HoodieActiveTimeline.createNewInstantTime();
Review comment:
rename: `compactionInstantTime`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+ /**
+ * Stream of instants to check conflicts against.
+ * @return
+ */
+ Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline,
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+
+ /**
+ * Implementations of this method will determine whether a conflict exists
between 2 commits.
+ * @param thisOperation
+ * @param otherOperation
+ * @return
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation);
Review comment:
this reads nicely :)
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
Review comment:
how are you testing this? I don't see curator added to any of the
bundles? Same for the zookeeper dependencies. This is a really really important
part
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+ private static final Logger LOG =
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+ private CuratorFramework curatorFrameworkClient;
+ private volatile InterProcessMutex lock = null;
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+ .retryPolicy(new
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+ 5000,
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
DEFAULT_ZK_SESSION_TIMEOUT_MS))
+
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+ .build();
+ this.curatorFrameworkClient.start();
+ }
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final CuratorFramework curatorFrameworkClient) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = curatorFrameworkClient;
+ synchronized (this.curatorFrameworkClient) {
+ if (this.curatorFrameworkClient.getState() !=
CuratorFrameworkState.STARTED) {
+ this.curatorFrameworkClient.start();
+ }
+ }
+ }
+
+ ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ }
+
+ public void acquireLock(long time, TimeUnit unit) throws Exception {
Review comment:
why public
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I
preppedRecords, final String instan
* Common method containing steps to be performed before write
(upsert/insert/...
* @param instantTime
* @param writeOperationType
+ * @param metaClient
*/
- protected void preWrite(String instantTime, WriteOperationType
writeOperationType) {
+ protected void preWrite(String instantTime, WriteOperationType
writeOperationType,
+ HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- syncTableMetadata();
+
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+ .lastInstant());
+ LOG.info("Last Instant Cached by writer with instant " + instantTime + "
is " + this.txnManager.getLastCompletedTransactionOwner());
+ this.txnManager.setTransactionOwner(Option.of(new
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+ this.txnManager.beginTransaction();
Review comment:
can the above setters be passed to an overloaded `beginTransaction(..)`
call? Whenever we have these contracts that some setters must be called ahead
of a beginTransaction, makes for a harder maintenance/read
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -114,6 +115,7 @@
HoodieFailedWritesCleaningPolicy.EAGER.name();
private static final String DEFAULT_AUTO_CLEAN = "true";
private static final String DEFAULT_ASYNC_CLEAN = "false";
+ private static final String DEFAULT_INLINE_CLEAN = DEFAULT_AUTO_CLEAN;
Review comment:
auto clean is different from the cleaning mode itself. lets just have an
assignment to the hardcoded string `"true"`?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+
+/**
+ * Hoodie Configs for Locks.
+ */
+public class HoodieLockConfig extends DefaultHoodieConfig {
+
+ // Pluggable type of lock provider
+ public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PREFIX +
"provider";
+ public static final String DEFAULT_LOCK_PROVIDER_CLASS =
ZookeeperBasedLockProvider.class.getName();
+ // Pluggable strategies to use when resolving conflicts
+ public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP =
+ LOCK_PREFIX + "conflict.resolution.strategy";
+ public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS =
+ SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName();
+
+ private HoodieLockConfig(Properties props) {
+ super(props);
+ }
+
+ public static HoodieLockConfig.Builder newBuilder() {
+ return new HoodieLockConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public HoodieLockConfig.Builder fromFile(File propertiesFile) throws
IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.props.load(reader);
+ return this;
+ }
+ }
+
+ public HoodieLockConfig.Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withLockProvider(Class<? extends
LockProvider> lockProvider) {
+ props.setProperty(LOCK_PROVIDER_CLASS_PROP, lockProvider.getName());
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withHiveDatabaseName(String databaseName) {
+ props.setProperty(HIVE_DATABASE_NAME_PROP, databaseName);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withHiveTableName(String tableName) {
+ props.setProperty(HIVE_TABLE_NAME_PROP, tableName);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
+ props.setProperty(ZK_CONNECT_URL_PROP, zkQuorum);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkBasePath(String zkBasePath) {
+ props.setProperty(ZK_BASE_PATH_PROP, zkBasePath);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkPort(String zkPort) {
+ props.setProperty(ZK_PORT_PROP, zkPort);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkLockKey(String zkLockKey) {
+ props.setProperty(ZK_LOCK_KEY_PROP, zkLockKey);
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkConnectionTimeoutInMs(Long
connectionTimeoutInMs) {
+ props.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP,
String.valueOf(connectionTimeoutInMs));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withZkSessionTimeoutInMs(Long
sessionTimeoutInMs) {
+ props.setProperty(ZK_SESSION_TIMEOUT_MS_PROP,
String.valueOf(sessionTimeoutInMs));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withNumRetries(int numRetries) {
+ props.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP,
String.valueOf(numRetries));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withRetryWaitTimeInMillis(Long
retryWaitTimeInMillis) {
+ props.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP,
String.valueOf(retryWaitTimeInMillis));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries)
{
+ props.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP,
String.valueOf(clientNumRetries));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withClientRetryWaitTimeInMillis(Long
clientRetryWaitTimeInMillis) {
+ props.setProperty(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP,
String.valueOf(clientRetryWaitTimeInMillis));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder withLockWaitTimeInMillis(Long
waitTimeInMillis) {
+ props.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP,
String.valueOf(waitTimeInMillis));
+ return this;
+ }
+
+ public HoodieLockConfig.Builder
withConflictResolutionStrategy(ConflictResolutionStrategy
conflictResolutionStrategy) {
+ props.setProperty(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP,
conflictResolutionStrategy.getClass().getName());
+ return this;
+ }
+
+ public HoodieLockConfig build() {
Review comment:
given we have had some typo related issues recently, please check each
line once for correctness
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -66,6 +70,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
this.operationType = operationType;
this.extraMetadata = extraMetadata;
this.taskContextSupplier = context.getTaskContextSupplier();
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ // TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.
Review comment:
lets create a code cleanup JIRA for this. else we may not get to this.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -193,8 +200,22 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats) throws IOException {
+ preCommit(instantTime, metadata);
Review comment:
lets move the `preCommit()` call out of here? `commit()` calling
`preCommit()` is bit confusing to read
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -389,29 +429,33 @@ protected void postCommit(HoodieTable<T, I, K, O> table,
HoodieCommitMetadata me
// Delete the marker directory for the instant.
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context,
config.getMarkersDeleteParallelism());
- // Do an inline compaction if enabled
- if (config.isInlineCompaction()) {
- runAnyPendingCompactions(table);
- metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP,
"true");
- inlineCompact(extraMetadata);
- } else {
- metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP,
"false");
- }
+ if (config.inlineTableServices()) {
+ // Do an inline compaction if enabled
+ if (config.inlineCompactionEnabled()) {
+ runAnyPendingCompactions(table);
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP,
"true");
+ inlineCompact(extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP,
"false");
+ }
- // Do an inline clustering if enabled
- if (config.isInlineClustering()) {
- runAnyPendingClustering(table);
- metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP,
"true");
- inlineCluster(extraMetadata);
- } else {
- metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP,
"false");
+ // Do an inline clustering if enabled
+ if (config.inlineClusteringEnabled()) {
+ runAnyPendingClustering(table);
+ metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP,
"true");
+ inlineCluster(extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP,
"false");
Review comment:
duplicate line?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+ /**
+ * Stream of instants to check conflicts against.
+ * @return
+ */
+ Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline,
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
Review comment:
rename: getCandidateInstants() to clarify intent.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I
preppedRecords, final String instan
* Common method containing steps to be performed before write
(upsert/insert/...
* @param instantTime
* @param writeOperationType
+ * @param metaClient
*/
- protected void preWrite(String instantTime, WriteOperationType
writeOperationType) {
+ protected void preWrite(String instantTime, WriteOperationType
writeOperationType,
+ HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- syncTableMetadata();
+
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+ .lastInstant());
+ LOG.info("Last Instant Cached by writer with instant " + instantTime + "
is " + this.txnManager.getLastCompletedTransactionOwner());
+ this.txnManager.setTransactionOwner(Option.of(new
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+ this.txnManager.beginTransaction();
+ try {
+ syncTableMetadata();
+ } finally {
+ this.txnManager.endTransaction();
Review comment:
do we need to distinguish between `endTransaction()` and an abort ? i.e
any cleanups in the transaction manager to be done here upon exception?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+ /**
+ * Stream of instants to check conflicts against.
+ * @return
+ */
+ Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline activeTimeline,
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+
+ /**
+ * Implementations of this method will determine whether a conflict exists
between 2 commits.
+ * @param thisOperation
+ * @param otherOperation
+ * @return
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation);
+
+ /**
+ * Implementations of this method will determine how to resolve a conflict
between 2 commits.
+ * @param thisOperation
+ * @param otherOperation
+ * @return
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<HoodieCommitMetadata>
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter,
HoodieTable table,
Review comment:
having the metadata writer passed in, feels off. any way to avoid this?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(LockManager.class);
+ private final HoodieWriteConfig writeConfig;
+ private final LockConfiguration lockConfiguration;
+ private final SerializableConfiguration hadoopConf;
+ private volatile LockProvider lockProvider;
+ // Holds the latest completed write instant to know which ones to check
conflict against
+ private final AtomicReference<Option<HoodieInstant>>
latestCompletedWriteInstant;
+
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+ this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+ }
+
+ public void lock() {
+ if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ LockProvider lockProvider = getLockProvider();
+ boolean acquired = false;
+ try {
+ int retries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+ long waitTimeInMs =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+ int retryCount = 0;
+ while (retryCount <= retries) {
+ acquired =
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(),
TimeUnit.MILLISECONDS);
+ if (acquired) {
+ break;
+ }
+ LOG.info("Retrying...");
+ Thread.sleep(waitTimeInMs);
+ retryCount++;
+ }
+ } catch (Exception e) {
Review comment:
any special handling for InterruptedException? this is a common cause of
bugs in such locking code paths
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/TransactionManager.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+
+/**
+ * This class allows clients to start and end transactions. Anything done
between a start and end transaction is
+ * guaranteed to be atomic.
+ */
+public class TransactionManager implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionManager.class);
+
+ private final LockManager lockManager;
+ private Option<HoodieInstant> currentTxnOwnerInstant;
+ private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
+
+ public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
+ this.lockManager = new LockManager(config, fs);
+ }
+
+ public synchronized void setLastCompletedTransaction(Option<HoodieInstant>
instant) {
+ this.lastCompletedTxnOwnerInstant = instant;
+
lockManager.compareAndSetLatestCompletedWriteInstant(lockManager.getLatestCompletedWriteInstant().get(),
instant);
+ LOG.info("Latest completed transaction instant " + instant);
+ }
+
+ public synchronized void setTransactionOwner(Option<HoodieInstant> instant) {
+ this.currentTxnOwnerInstant = instant;
+ LOG.info("Current transaction instant " + instant);
+ }
+
+ public synchronized void beginTransaction() {
+ LOG.info("Transaction starting");
Review comment:
combine into a single log line?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/HoodieCommitOperation.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is used to hold all information used to identify how to resolve
conflicts between instants.
+ * Since we interchange payload types between AVRO specific records and
POJO's, this object serves as
+ * a common payload to manage these conversions.
+ */
+public class HoodieCommitOperation {
Review comment:
rename: ConflictingOperation
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant) {
+
+ // To find which instants are conflicting, we apply the following logic
+ // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
+ // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
+ // after the current instant. We need to check for write conflicts since
they may have mutated the same files
+ // that are being newly created by the current write.
+ Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+ .getCommitsTimeline()
+ // TODO : getWriteTimeline to ensure we include replace commits as well
+ .filterCompletedInstants()
+ .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : "0")
Review comment:
can we avoid passing the `"0"` or use an existing constant for init
instant time etc
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(LockManager.class);
+ private final HoodieWriteConfig writeConfig;
+ private final LockConfiguration lockConfiguration;
+ private final SerializableConfiguration hadoopConf;
+ private volatile LockProvider lockProvider;
+ // Holds the latest completed write instant to know which ones to check
conflict against
+ private final AtomicReference<Option<HoodieInstant>>
latestCompletedWriteInstant;
+
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+ this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+ }
+
+ public void lock() {
+ if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ LockProvider lockProvider = getLockProvider();
+ boolean acquired = false;
+ try {
+ int retries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+ long waitTimeInMs =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+ int retryCount = 0;
+ while (retryCount <= retries) {
+ acquired =
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(),
TimeUnit.MILLISECONDS);
+ if (acquired) {
+ break;
+ }
+ LOG.info("Retrying...");
+ Thread.sleep(waitTimeInMs);
+ retryCount++;
+ }
+ } catch (Exception e) {
+ throw new HoodieLockException("Unable to acquire lock ", e);
+ }
+ if (!acquired) {
+ throw new HoodieLockException("Unable to acquire lock, lock object " +
lockProvider.getLock());
+ }
+ }
+ }
+
+ public void unlock() {
+ if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ getLockProvider().unlock();
Review comment:
I assume the providers unlock() will throw more exceptions.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/TransactionManager.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+
+/**
+ * This class allows clients to start and end transactions. Anything done
between a start and end transaction is
+ * guaranteed to be atomic.
+ */
+public class TransactionManager implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionManager.class);
+
+ private final LockManager lockManager;
+ private Option<HoodieInstant> currentTxnOwnerInstant;
+ private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
+
+ public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
+ this.lockManager = new LockManager(config, fs);
+ }
+
+ public synchronized void setLastCompletedTransaction(Option<HoodieInstant>
instant) {
+ this.lastCompletedTxnOwnerInstant = instant;
+
lockManager.compareAndSetLatestCompletedWriteInstant(lockManager.getLatestCompletedWriteInstant().get(),
instant);
+ LOG.info("Latest completed transaction instant " + instant);
+ }
+
+ public synchronized void setTransactionOwner(Option<HoodieInstant> instant) {
+ this.currentTxnOwnerInstant = instant;
+ LOG.info("Current transaction instant " + instant);
+ }
+
+ public synchronized void beginTransaction() {
+ LOG.info("Transaction starting");
+ LOG.info("Transaction Owner " + currentTxnOwnerInstant);
+ lockManager.lock();
+ LOG.info("Transaction started");
+ }
+
+ public synchronized void endTransaction() {
+ LOG.info("Transaction ending");
Review comment:
same . combine?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant) {
+
+ // To find which instants are conflicting, we apply the following logic
+ // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
+ // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
+ // after the current instant. We need to check for write conflicts since
they may have mutated the same files
+ // that are being newly created by the current write.
+ Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+ .getCommitsTimeline()
+ // TODO : getWriteTimeline to ensure we include replace commits as well
+ .filterCompletedInstants()
+ .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : "0")
+ .getInstants();
+
+ Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION,
COMPACTION_ACTION))
+ .findInstantsAfter(currentInstant.getTimestamp())
+ .getInstants();
+ return Stream.concat(completedCommitsInstantStream,
compactionAndClusteringTimeline);
+ }
+
+ @Override
+ public boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation) {
+ // TODO : UUID's can clash even for insert/insert, handle that case.
+ Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+ Set<String> fileIdsSetForSecondInstant =
otherOperation.getMutatedFileIds();
+ Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+ intersection.retainAll(fileIdsSetForSecondInstant);
+ if (!intersection.isEmpty()) {
+ LOG.error("Found conflicting writes between first operation = " +
thisOperation
Review comment:
is this an error though. can we move to INFO
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+ private static final Logger LOG =
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+ private CuratorFramework curatorFrameworkClient;
Review comment:
make it `final`. In general, can you make a pass to ensure what can be
final is made final.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+ // Only a single writer can perform write ops
+ SINGLE_WRITER("single_writer"),
+ // Multiple writer can perform write ops with lazy conflict resolution using
locks
+ OPTIMISTIC_CONCURRENCY_CONTROL("optimistic_concurrency_control");
+
+ private final String value;
+
+ WriteConcurrencyMode(String value) {
+ this.value = value;
+ }
+
+ /**
+ * Getter for write concurrency mode.
+ * @return
+ */
+ public String value() {
+ return value;
+ }
+
+ /**
+ * Convert string value to WriteConcurrencyMode.
+ */
+ public static WriteConcurrencyMode fromValue(String value) {
+ switch (value.toLowerCase(Locale.ROOT)) {
+ case "single_writer":
+ return SINGLE_WRITER;
+ case "optimistic_concurrency_control":
+ return OPTIMISTIC_CONCURRENCY_CONTROL;
+ default:
+ throw new HoodieException("Invalid value of Type.");
+ }
+ }
+
+ public boolean supportsOptimisticConcurrencyControl() {
Review comment:
rename: isOptimistic... ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
Review comment:
does this have an unit test on its own?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ if (HoodieTableVersion.current() !=
metaClient.getTableConfig().getTableVersion()) {
+ // TODO : Force clean up of all inflights, do this once pending rollback
removal PR is landed
+ // this.rollbackFailedWrites();
+ this.txnManager.beginTransaction();
Review comment:
should we check concurrency mode before taking these locks?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -82,6 +84,10 @@ public static WriteOperationType fromValue(String value) {
return INSERT_OVERWRITE_TABLE;
case "cluster":
return CLUSTER;
+ case "compact":
+ return COMPACT;
+ case "unknown":
Review comment:
what is this really? how can there be an write that is unknown?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+@NotThreadSafe
Review comment:
but we seem to synchronize down below?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ if (HoodieTableVersion.current() !=
metaClient.getTableConfig().getTableVersion()) {
Review comment:
lets add a method `needsUpgradeOrDowngrade() ` to the upgradedowngrade
class?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+ private static final Logger LOG =
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+ private CuratorFramework curatorFrameworkClient;
+ private volatile InterProcessMutex lock = null;
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+ .retryPolicy(new
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+ 5000,
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
DEFAULT_ZK_SESSION_TIMEOUT_MS))
+
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+ .build();
+ this.curatorFrameworkClient.start();
+ }
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final CuratorFramework curatorFrameworkClient) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = curatorFrameworkClient;
+ synchronized (this.curatorFrameworkClient) {
+ if (this.curatorFrameworkClient.getState() !=
CuratorFrameworkState.STARTED) {
+ this.curatorFrameworkClient.start();
+ }
+ }
+ }
+
+ ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ }
+
+ public void acquireLock(long time, TimeUnit unit) throws Exception {
+ ValidationUtils.checkArgument(this.lock == null,
LockState.ALREADY_ACQUIRED.name());
+ InterProcessMutex newLock = new InterProcessMutex(
+ this.curatorFrameworkClient,
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP) + "/"
+ + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+ newLock.acquire(time, unit);
+ if (newLock.isAcquiredInThisProcess()) {
+ lock = newLock;
+ }
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ try {
+ acquireLock(time, unit);
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
Review comment:
so, the lock can be acquired by another process and we don't raise this
exception? is that ok? I see lines 94-96 above, where we simply do the
assignment and not raise any exceptions from an `else` block
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
+
+ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
+ HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
+ archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+ archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
hoodieInstant));
+ } else {
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
hoodieInstant));
+ }
+ archivedMetaWrapper.setActionType(ActionType.clean.name());
+ break;
+ }
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION: {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+ archivedMetaWrapper.setActionType(ActionType.commit.name());
+ break;
+ }
+ case HoodieTimeline.REPLACE_COMMIT_ACTION: {
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
+
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
+
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
+ archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+ break;
+ }
+ case HoodieTimeline.ROLLBACK_ACTION: {
+
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieRollbackMetadata.class));
+ archivedMetaWrapper.setActionType(ActionType.rollback.name());
+ break;
+ }
+ case HoodieTimeline.SAVEPOINT_ACTION: {
+
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
+
metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(),
HoodieSavepointMetadata.class));
+ archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+ break;
+ }
+ case HoodieTimeline.COMPACTION_ACTION: {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ archivedMetaWrapper.setActionType(ActionType.compaction.name());
+ break;
+ }
+ default: {
+ throw new UnsupportedOperationException("Action not fully supported
yet");
+ }
+ }
+ return archivedMetaWrapper;
+ }
+
+ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant,
+ HoodieCommitMetadata
hoodieCommitMetadata) {
+ HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
+ archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
+ archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(hoodieCommitMetadata));
+ archivedMetaWrapper.setActionType(ActionType.commit.name());
+ return archivedMetaWrapper;
+ }
+
+ public static org.apache.hudi.avro.model.HoodieCommitMetadata
convertCommitMetadata(
+ HoodieCommitMetadata hoodieCommitMetadata) {
+ ObjectMapper mapper = new ObjectMapper();
+ // Need this to ignore other public get() methods
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData =
+ mapper.convertValue(hoodieCommitMetadata,
org.apache.hudi.avro.model.HoodieCommitMetadata.class);
+ // Do not archive Rolling Stats, cannot set to null since AVRO will throw
null pointer
+
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
"");
+ return avroMetaData;
+ }
+
+ // TODO : Fix converting from SpecificRecord to POJO
Review comment:
would this come back to haunt us ?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+ try {
+ HoodieTableMetadataWriter writer =
+ SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config,
context);
LOG.info("Successfully synced to metadata table");
} catch (Exception e) {
throw new HoodieMetadataException("Error syncing to metadata table.", e);
}
}
+ @Override
+ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
+ // Important to create this after the lock to ensure latest commits show
up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
Review comment:
should we recreate? again, we need to revisit this whole passing of
metadatawriter to resolve conflict. Don't understand this part
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public Stream<HoodieInstant> getInstantsStream(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant) {
+
+ // To find which instants are conflicting, we apply the following logic
+ // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
+ // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
+ // after the current instant. We need to check for write conflicts since
they may have mutated the same files
+ // that are being newly created by the current write.
+ Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
+ .getCommitsTimeline()
+ // TODO : getWriteTimeline to ensure we include replace commits as well
+ .filterCompletedInstants()
+ .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : "0")
+ .getInstants();
+
+ Stream<HoodieInstant> compactionAndClusteringTimeline = activeTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION,
COMPACTION_ACTION))
+ .findInstantsAfter(currentInstant.getTimestamp())
+ .getInstants();
+ return Stream.concat(completedCommitsInstantStream,
compactionAndClusteringTimeline);
+ }
+
+ @Override
+ public boolean hasConflict(HoodieCommitOperation thisOperation,
HoodieCommitOperation otherOperation) {
+ // TODO : UUID's can clash even for insert/insert, handle that case.
+ Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+ Set<String> fileIdsSetForSecondInstant =
otherOperation.getMutatedFileIds();
+ Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+ intersection.retainAll(fileIdsSetForSecondInstant);
+ if (!intersection.isEmpty()) {
+ LOG.error("Found conflicting writes between first operation = " +
thisOperation
+ + ", second operation = " + otherOperation + " , intersecting file
ids " + intersection);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata>
resolveConflict(Option<HoodieBackedTableMetadataWriter> metadataWriter,
HoodieTable table,
+ HoodieCommitOperation
thisOperation, HoodieCommitOperation otherOperation) {
+ // NOTE that any commits from table services such as compaction,
clustering or cleaning since the
+ // overlapping of files is handled using MVCC. Since compaction is
eventually written as commit, we need to ensure
+ // we handle this during conflict resolution and not treat the commit from
compaction operation as a regular commit.
+ if (otherOperation.getOperationType() == WriteOperationType.UNKNOWN
Review comment:
but replace commit can also result from INSERT_OVERWRITE correct? how do
we distinguish this? I feel we need a more nuanced check here. So ensure only
writes fail each other.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionUtils.class);
+
+ /**
+ * Resolve any write conflicts when committing data.
+ * @param table
+ * @param metadataWriter
+ * @param currentTxnOwnerInstant
+ * @param thisCommitMetadata
+ * @param config
+ * @param lastCompletedTxnOwnerInstant
+ * @return
+ * @throws HoodieWriteConflictException
+ */
+ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
+ final
Option<HoodieInstant> currentTxnOwnerInstant, final
Option<HoodieCommitMetadata> thisCommitMetadata,
+ final
HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant)
+ throws HoodieWriteConflictException {
+ if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ ConflictResolutionStrategy resolutionStrategy =
config.getWriteConflictResolutionStrategy();
+ Stream<HoodieInstant> instantStream =
resolutionStrategy.getInstantsStream(table.getActiveTimeline(),
currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
+ // TODO : metadataWriter.reload() inside resolve write conflict ??
Review comment:
Do we need this? is this TODO still relevant
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
Review comment:
are these ever used in `hudi-common`? if we don't anticipate readers
using this. we should just keep all this in `hudi-client-common` under a
transaction package
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ if (HoodieTableVersion.current() !=
metaClient.getTableConfig().getTableVersion()) {
+ // TODO : Force clean up of all inflights, do this once pending rollback
removal PR is landed
+ // this.rollbackFailedWrites();
+ this.txnManager.beginTransaction();
+ try {
+ // Ensure no inflight commits
+ TransactionUtils.resolveConflictIfAnyForUpgradeDowngrade(metaClient);
+ new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
+ } finally {
+ this.txnManager.endTransaction();
+ }
+ }
+ return getTableAndInitCtx(metaClient, operationType, instantTime);
}
- private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient,
WriteOperationType operationType) {
+ // TODO : To enforce priority between table service and ingestion writer,
use transactions here and invoke strategy
Review comment:
comment valid? What should we do about this issue? Can you elabortate?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionUtils.class);
+
+ /**
+ * Resolve any write conflicts when committing data.
+ * @param table
+ * @param metadataWriter
+ * @param currentTxnOwnerInstant
+ * @param thisCommitMetadata
+ * @param config
+ * @param lastCompletedTxnOwnerInstant
+ * @return
+ * @throws HoodieWriteConflictException
+ */
+ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
Review comment:
unit test this method?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -401,13 +441,25 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+ try {
+ HoodieTableMetadataWriter writer =
Review comment:
we lose auto closing by moving to a regular try-catch. Why is this
change needed?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider<T> implements Lock, AutoCloseable {
Review comment:
can this be an interface?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommonMetadata.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+
+public class HoodieCommonMetadata {
Review comment:
rename: HoodieMetadataWrapper
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
##########
@@ -222,6 +225,17 @@ protected void commit(Option<Map<String, String>>
extraMetadata, HoodieWriteMeta
LOG.info("Committing metadata bootstrap !!");
}
+ @Override
+ protected void syncTableMetadata() {
Review comment:
I think all this can be removed. from all action executors? can't we
take the lock in post commit/write from the write client level?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Configuration for managing locks. Since this configuration needs to be
shared with HiveMetaStore based lock,
+ * which is in a different package than other lock providers, we use this as a
data transfer object in hoodie-common
+ */
+public class LockConfiguration implements Serializable {
+
+ public static final String LOCK_PREFIX = "hoodie.writer.lock.";
+ public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP =
LOCK_PREFIX + "wait_time_ms_between_retry";
+ public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS =
String.valueOf(5000L);
+ public static final String
LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX +
"client.wait_time_ms_between_retry";
+ public static final String
DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(10000L);
+ public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_PREFIX +
"num_retries";
+ public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES =
String.valueOf(3);
+ public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP =
LOCK_PREFIX + "client.num_retries";
+ public static final String DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES =
String.valueOf(0);
+ public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = LOCK_PREFIX +
"wait_time_ms";
+ public static final int DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS = 60 * 1000;
+ // configs for file system based locks. NOTE: This only works for DFS with
atomic create/delete operation
+ public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX =
LOCK_PREFIX + "filesystem.";
+ public static final String FILESYSTEM_LOCK_PATH_PROP =
FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
+ // configs for metastore based locks
+ public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX
+ "hivemetastore.";
+ public static final String HIVE_DATABASE_NAME_PROP =
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "database";
+ public static final String HIVE_TABLE_NAME_PROP =
HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "table";
+ // Zookeeper configs for zk based locks
+ public static final String ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX =
LOCK_PREFIX + "zookeeper.";
+ public static final String ZK_BASE_PATH_PROP =
ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_base_path";
Review comment:
I think the common term is . zookeeper chroot?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -195,30 +126,24 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
@Override
public HoodieCleanMetadata execute() {
+ List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
// If there are inflight(failed) or previously requested clean operation,
first perform them
List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (pendingCleanInstants.size() > 0) {
pendingCleanInstants.forEach(hoodieInstant -> {
LOG.info("Finishing previously unfinished cleaner instant=" +
hoodieInstant);
try {
- runPendingClean(table, hoodieInstant);
+ cleanMetadataList.add(runPendingClean(table, hoodieInstant));
} catch (Exception e) {
LOG.warn("Failed to perform previous clean operation, instant: " +
hoodieInstant, e);
}
});
table.getMetaClient().reloadActiveTimeline();
}
-
- // Plan and execute a new clean action
- Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
- if (cleanerPlanOpt.isPresent()) {
- table.getMetaClient().reloadActiveTimeline();
- HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
- if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) &&
!cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
- return runClean(table,
HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
- }
- }
- return null;
+ // return the last clean metadata for now
+ // TODO (NA) : Clean only the earliest pending clean just like how we do
for other table services
Review comment:
I don't understand why we just pick the last cleaned metadata. Lets do
the generically right thing. If you want to handle more than more cleaning
operation, lets return a list?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class BaseCleanPlanActionExecutor<T extends
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O,
Option<HoodieCleanerPlan>> {
+
+ private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
+
+ private final Option<Map<String, String>> extraMetadata;
+
+ public BaseCleanPlanActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ Option<Map<String, String>>
extraMetadata) {
+ super(context, config, table, instantTime);
+ this.extraMetadata = extraMetadata;
+ }
+
+ protected abstract Option<HoodieCleanerPlan> createCleanerPlan();
+
+ /**
+ * Generates List of files to be cleaned.
+ *
+ * @param context HoodieEngineContext
+ * @return Cleaner Plan
+ */
+ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
Review comment:
I assume, this is all just code moved from the other class. if not ,
please point out what has changed
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -43,7 +43,8 @@
public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
-
+ // Turn on inline cleaning
+ public static final String INLINE_CLEAN_PROP = "hoodie.clean.inline";
Review comment:
should there be a single property? i.e hoodie.clean.async= false does
imply `hoodie.clean.inline=true` right?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -195,30 +126,24 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
@Override
public HoodieCleanMetadata execute() {
+ List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
// If there are inflight(failed) or previously requested clean operation,
first perform them
List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (pendingCleanInstants.size() > 0) {
pendingCleanInstants.forEach(hoodieInstant -> {
LOG.info("Finishing previously unfinished cleaner instant=" +
hoodieInstant);
try {
- runPendingClean(table, hoodieInstant);
+ cleanMetadataList.add(runPendingClean(table, hoodieInstant));
} catch (Exception e) {
LOG.warn("Failed to perform previous clean operation, instant: " +
hoodieInstant, e);
}
});
table.getMetaClient().reloadActiveTimeline();
}
-
- // Plan and execute a new clean action
- Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
- if (cleanerPlanOpt.isPresent()) {
- table.getMetaClient().reloadActiveTimeline();
- HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
- if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) &&
!cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
- return runClean(table,
HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
- }
- }
- return null;
+ // return the last clean metadata for now
+ // TODO (NA) : Clean only the earliest pending clean just like how we do
for other table services
Review comment:
Won't this for e.g mess up the metadata table? by missing some deletes?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider<T> implements Lock, AutoCloseable {
Review comment:
if there is no shared code here, we should go for an interface vs an
abstract class
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/HoodieMetadataConversionUtils.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.ReplaceArchivalHelper;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import java.io.IOException;
+
+/**
+ * Helper class to convert between different action related payloads and
{@link HoodieArchivedMetaEntry}.
+ */
+public class HoodieMetadataConversionUtils {
Review comment:
rename to `MetadataConversionUtils` ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -117,12 +126,24 @@ protected String getCommitActionType() {
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
- commit(extraMetadata, result);
+ autoCommit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
+ protected void autoCommit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<O> result) {
+ this.txnManager.beginTransaction();
+ try {
+ // TODO : Refactor this method so we can pass a valid metadata table
writer
Review comment:
revisit the todo?
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
##########
@@ -224,6 +231,16 @@ protected void completeCompaction(HoodieCommitMetadata
metadata,
return getTableAndInitCtx(metaClient, operationType);
}
+ @Override
+ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
+ // Important to create this after the lock to ensure latest commits show
up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ // TODO : Metadata Writer is not supported for Java
Review comment:
revisit comment.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.HoodieCommitOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class TransactionUtils {
+
+ private static final Logger LOG =
LogManager.getLogger(TransactionUtils.class);
+
+ /**
+ * Resolve any write conflicts when committing data.
+ * @param table
+ * @param metadataWriter
+ * @param currentTxnOwnerInstant
+ * @param thisCommitMetadata
+ * @param config
+ * @param lastCompletedTxnOwnerInstant
+ * @return
+ * @throws HoodieWriteConflictException
+ */
+ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final
HoodieTable table, final Option<HoodieBackedTableMetadataWriter> metadataWriter,
Review comment:
is the `metadataWriter` really used inside this method?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -93,4 +95,28 @@ private static HoodieCommitMetadata
buildMetadataFromStats(List<HoodieWriteStat>
+ "numReplaceFileIds:" +
partitionToReplaceFileIds.values().stream().mapToInt(e -> e.size()).sum());
return commitMetadata;
}
+
+ public static HashMap<String, String>
getFileIdWithoutSuffixAndRelativePaths(Map<String,
List<org.apache.hudi.avro.model.HoodieWriteStat>>
Review comment:
lets unit tests these?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -380,11 +391,40 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- new SparkUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ if (HoodieTableVersion.current() !=
metaClient.getTableConfig().getTableVersion()) {
+ // TODO : Force clean up of all inflights, do this once pending rollback
removal PR is landed
Review comment:
lets clean this up?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -265,6 +266,20 @@ public void rollbackBootstrap(HoodieEngineContext context,
String instantTime) {
throw new HoodieNotSupportedException("Bootstrap is not supported yet");
}
+ /**
+ * TODO :
+ * Refactor {@link FlinkCleanActionExecutor} to support scheduling of
cleaning.
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for scheduling cleaning
+ * @param extraMetadata additional metadata to write into plan
+ * @return
+ */
+ @Override
+ public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
Review comment:
this does not seem ok to do?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -313,6 +320,16 @@ public void cleanHandles() {
return writeHandle;
}
+ @Override
+ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
+ // Important to create this after the lock to ensure latest commits show
up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ // TODO : Metadata Writer is not supported for Flink
Review comment:
revisit todo?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/TableService.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+/**
+ * Supported runtime table services.
+ */
+public enum TableService {
Review comment:
rename: TableServiceType
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]