vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r549550664
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -48,6 +50,7 @@
protected final transient Configuration hadoopConf;
protected final HoodieWriteConfig config;
protected final String basePath;
+ protected AtomicReference<Option<HoodieInstant>>
latestWriteInstantCompletedBeforeWriter = new AtomicReference<>();
Review comment:
AtomicReference to handle multiple threads using the same client object?
+1
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -203,6 +208,45 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats,
partitionToReplaceFileIds, extraMetadata,
+ operationType, config.getSchema(), commitActionType);
+ if (config.isMultiWriterEnabled()) {
+ // get strategy and lock type
+ LockConfiguration lockConfiguration = new
LockConfiguration(config.getProps());
+ LockProvider lockProvider = (LockProvider)
ReflectionUtils.loadClass(config.getLockProviderClass(),
+ lockConfiguration, fs.getConf());
+ try {
+ // TODO : Get timeout and set the timeout
+ boolean acquired = lockProvider.tryLock();
+ if (!acquired) {
+ throw new HoodieException("Unable to acquire lock");
Review comment:
some more context on basePath, inner exception if any ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -203,6 +208,45 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats,
partitionToReplaceFileIds, extraMetadata,
+ operationType, config.getSchema(), commitActionType);
+ if (config.isMultiWriterEnabled()) {
+ // get strategy and lock type
+ LockConfiguration lockConfiguration = new
LockConfiguration(config.getProps());
+ LockProvider lockProvider = (LockProvider)
ReflectionUtils.loadClass(config.getLockProviderClass(),
Review comment:
can we move the reflection part to the config object?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -821,6 +865,38 @@ protected void finalizeWrite(HoodieTable<T, I, K, O>
table, String instantTime,
}
}
+ private HoodieCommitMetadata resolveWriteConflictIfAny(HoodieTable<T, I, K,
O> table, final String instantTime,
+ HoodieCommitMetadata
thisCommitMetadata) {
+ Long currentTimeInSecs = System.currentTimeMillis() / 1000;
+ ConflictResolutionStrategy resolutionStrategy =
config.getWriteConflictResolutionStrategy();
+ Option<HoodieInstant> lastCompletedInstantBeforeWriterStarted =
this.latestWriteInstantCompletedBeforeWriter.get();
+ String lastInstantTimestamp =
lastCompletedInstantBeforeWriterStarted.isPresent()
+ ? lastCompletedInstantBeforeWriterStarted.get().getTimestamp() : "0";
+ Stream<HoodieInstant> instantStream = table.getActiveTimeline()
+ .getAllCommitsTimeline()
+ .getCommitsAndCompactionTimeline()
+ .filterCompletedInstants()
+ .findInstantsInRange(lastInstantTimestamp,
String.valueOf(currentTimeInSecs))
+ .getInstants();
+
+ LOG.info("Current eligible instants during write oHoodieWriteConfigf
instant " + instantTime + " = "
Review comment:
typos
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ */
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+ private static final Logger LOG =
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+ private CuratorFramework curatorFrameworkClient;
+ private final AtomicReference<InterProcessMutex> lock = new
AtomicReference<>();
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+ .retryPolicy(new
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+ 5000,
lockConfiguration.getConfig().getInteger(HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+ .namespace(lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP))
+
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
DEFAULT_ZK_SESSION_TIMEOUT_MS))
+
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+ .build();
+ this.curatorFrameworkClient.start();
+ }
+
+ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final CuratorFramework curatorFramework) {
+ this(lockConfiguration);
+ this.curatorFrameworkClient = curatorFramework;
+ this.curatorFrameworkClient.start();
+ }
+
+ ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ }
+
+ @Override
+ public void acquireLock() throws Exception {
+ ValidationUtils.checkArgument(this.lock.get() == null, "Lock is already
acquired");
+ InterProcessMutex newLock = new InterProcessMutex(
+ this.curatorFrameworkClient,
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP) + "/"
+ + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+ newLock.acquire();
+ lock.compareAndSet(null, newLock);
+ }
+
+ @Override
+ public boolean tryLock() {
+ LOG.info("Trying to acquire lock for ZkBasePath " +
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP)
+ + " and lock key " +
this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+ try {
+ acquireLock();
+ } catch (Exception e) {
+ throw new HoodieLockException("Unable to acquire lock", e);
+ }
+ return lock.get() != null && lock.get().isAcquiredInThisProcess();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info("Releasing lock for ZkBasePath "
+ + lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP) + " and
lock key "
+ + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP));
+ if (lock.get() == null) {
+ return;
+ }
+ lock.get().release();
+ lock.set(null);
+ LOG.info("Released lock");
Review comment:
more context?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import
org.apache.hudi.client.lock.ConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_HOODIE_LOCK_ACQUIRE_NUM_RETRIES;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_PREFIX;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
+
+/**
+ * Write callback related config.
+ */
+public class HoodieLockConfig extends DefaultHoodieConfig {
+
+ // Pluggable type of lock provider
+ public static final String HOODIE_LOCK_PROVIDER_CLASS_PROP =
HOODIE_LOCK_PREFIX + "provider";
Review comment:
lets drop the `HOODIE_` prefix from the properties, per how we have
named the configs thus far
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider implements Lock, AutoCloseable {
+
+ private static final Logger LOG = LogManager.getLogger(LockProvider.class);
+
+ protected LockConfiguration lockConfiguration;
+
+ protected abstract void acquireLock() throws Exception;
+
+ @Override
+ public final void lock() {
+ LOG.info("Acquiring lock");
+ try {
+ acquireLock();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ LOG.info("Acquired lock");
+ }
+
+ @Override
+ public final void lockInterruptibly() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final boolean tryLock(long time, TimeUnit unit) {
Review comment:
we should probably support this right. even of the `lock()` , since this
has a timeout param?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -118,7 +118,7 @@ public
SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
return ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table,
context, config))
- .performClustering(inputRecords,
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams,
readerSchema);
+ .performClustering(inputRecords, 0, instantTime, strategyParams,
readerSchema);
Review comment:
why is this relevant here?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -203,6 +208,45 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats,
partitionToReplaceFileIds, extraMetadata,
+ operationType, config.getSchema(), commitActionType);
+ if (config.isMultiWriterEnabled()) {
+ // get strategy and lock type
+ LockConfiguration lockConfiguration = new
LockConfiguration(config.getProps());
+ LockProvider lockProvider = (LockProvider)
ReflectionUtils.loadClass(config.getLockProviderClass(),
+ lockConfiguration, fs.getConf());
+ try {
+ // TODO : Get timeout and set the timeout
+ boolean acquired = lockProvider.tryLock();
+ if (!acquired) {
+ throw new HoodieException("Unable to acquire lock");
+ }
+ LOG.info("Acquired lock for instant time " + instantTime);
+ // TODO : Move the following to a "critical section"
Review comment:
is TODO still valid?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -203,6 +208,45 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats,
partitionToReplaceFileIds, extraMetadata,
+ operationType, config.getSchema(), commitActionType);
+ if (config.isMultiWriterEnabled()) {
+ // get strategy and lock type
+ LockConfiguration lockConfiguration = new
LockConfiguration(config.getProps());
+ LockProvider lockProvider = (LockProvider)
ReflectionUtils.loadClass(config.getLockProviderClass(),
+ lockConfiguration, fs.getConf());
+ try {
+ // TODO : Get timeout and set the timeout
+ boolean acquired = lockProvider.tryLock();
+ if (!acquired) {
+ throw new HoodieException("Unable to acquire lock");
+ }
+ LOG.info("Acquired lock for instant time " + instantTime);
+ // TODO : Move the following to a "critical section"
+ // Create a Hoodie table which encapsulated the commits and files
visible.
+ // Important to create this after the lock to ensure latest commits
show up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ try {
+ metadata = resolveWriteConflictIfAny(table, instantTime, metadata);
+ return commitStats(instantTime, stats, extraMetadata,
commitActionType, Collections.emptyMap(), metadata);
+ } catch (Exception e) {
Review comment:
We need a new Exception class here. `HoodieWriteConflictException` or sth
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -203,6 +208,45 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats,
partitionToReplaceFileIds, extraMetadata,
+ operationType, config.getSchema(), commitActionType);
+ if (config.isMultiWriterEnabled()) {
Review comment:
use the concurrency mode config introduced by the other PR and use that
instead to fence this block?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -203,6 +208,45 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
return true;
}
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats,
partitionToReplaceFileIds, extraMetadata,
+ operationType, config.getSchema(), commitActionType);
+ if (config.isMultiWriterEnabled()) {
+ // get strategy and lock type
+ LockConfiguration lockConfiguration = new
LockConfiguration(config.getProps());
+ LockProvider lockProvider = (LockProvider)
ReflectionUtils.loadClass(config.getLockProviderClass(),
+ lockConfiguration, fs.getConf());
+ try {
+ // TODO : Get timeout and set the timeout
+ boolean acquired = lockProvider.tryLock();
+ if (!acquired) {
+ throw new HoodieException("Unable to acquire lock");
+ }
+ LOG.info("Acquired lock for instant time " + instantTime);
+ // TODO : Move the following to a "critical section"
+ // Create a Hoodie table which encapsulated the commits and files
visible.
+ // Important to create this after the lock to ensure latest commits
show up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ try {
+ metadata = resolveWriteConflictIfAny(table, instantTime, metadata);
+ return commitStats(instantTime, stats, extraMetadata,
commitActionType, Collections.emptyMap(), metadata);
+ } catch (Exception e) {
+ // if strategy throws exception, first release lock
+ lockProvider.unlock();
Review comment:
why unlock here again, when you do it in finally
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -821,6 +865,38 @@ protected void finalizeWrite(HoodieTable<T, I, K, O>
table, String instantTime,
}
}
+ private HoodieCommitMetadata resolveWriteConflictIfAny(HoodieTable<T, I, K,
O> table, final String instantTime,
+ HoodieCommitMetadata
thisCommitMetadata) {
+ Long currentTimeInSecs = System.currentTimeMillis() / 1000;
+ ConflictResolutionStrategy resolutionStrategy =
config.getWriteConflictResolutionStrategy();
+ Option<HoodieInstant> lastCompletedInstantBeforeWriterStarted =
this.latestWriteInstantCompletedBeforeWriter.get();
+ String lastInstantTimestamp =
lastCompletedInstantBeforeWriterStarted.isPresent()
+ ? lastCompletedInstantBeforeWriterStarted.get().getTimestamp() : "0";
+ Stream<HoodieInstant> instantStream = table.getActiveTimeline()
+ .getAllCommitsTimeline()
+ .getCommitsAndCompactionTimeline()
+ .filterCompletedInstants()
+ .findInstantsInRange(lastInstantTimestamp,
String.valueOf(currentTimeInSecs))
+ .getInstants();
+
+ LOG.info("Current eligible instants during write oHoodieWriteConfigf
instant " + instantTime + " = "
+ + instantStream.collect(Collectors.toList()));
+
+ boolean hasConflict = instantStream.anyMatch(instant -> {
+ try {
+ return resolutionStrategy.hasConflict(thisCommitMetadata,
HoodieCommitMetadata.fromBytes(
+ table.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class));
+ } catch (IOException io) {
+ throw new HoodieCommitException("Unable to determine if conflict
exists", io);
+ }
+ });
+ HoodieCommitMetadata newMetadataAfterConflictResolution =
thisCommitMetadata;
+ if (hasConflict) {
+ newMetadataAfterConflictResolution =
resolutionStrategy.resolveConflict(config, hadoopConf);
Review comment:
we should probably have an API to resolveConflict(instant1, instant2) ?
i.e like our preCombine method? that will give resolutionStrategy implementors
to just reason between two instants and whether they conflict or not, and if
they do, then munge the commit metadata somehow.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -821,6 +865,38 @@ protected void finalizeWrite(HoodieTable<T, I, K, O>
table, String instantTime,
}
}
+ private HoodieCommitMetadata resolveWriteConflictIfAny(HoodieTable<T, I, K,
O> table, final String instantTime,
+ HoodieCommitMetadata
thisCommitMetadata) {
+ Long currentTimeInSecs = System.currentTimeMillis() / 1000;
Review comment:
I was expecting to use the atomic reference above. and just pull all the
writes that happened after the `latestWriteInstantCompletedBeforeWriter` ? Also
lets not assume that time in seconds is what we will use here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(ConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public boolean hasConflict(HoodieCommitMetadata firstInstant,
HoodieCommitMetadata secondInstant) {
+ // TODO : Ensure file ids are only the UUID and not the full name, also
cann UUID's clash then for insert/insert ?
+ Set<String> fileIdsSetForFirstInstant =
firstInstant.getFileIdWithoutSuffixAndRelativePaths().keySet();
+ Set<String> fileIdsSetForSecondInstant =
secondInstant.getFileIdWithoutSuffixAndRelativePaths().keySet();
+ Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+ intersection.retainAll(fileIdsSetForSecondInstant);
+ if (!intersection.isEmpty()) {
+ LOG.error("Found conflicting writes " + intersection);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public HoodieCommitMetadata resolveConflict(HoodieWriteConfig config,
Configuration configuration) {
+ throw new UnsupportedOperationException("Cannot resolve conflicts for
overlapping writes");
Review comment:
throw a special purpose exception here please
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+/**
+ * Strategy for conflict resolution with multiple writers. Provide pluggable
implementations for different
+ * kinds of strategies to execute to resolve conflicts when multiple writers
are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+ boolean hasConflict(HoodieCommitMetadata c1, HoodieCommitMetadata c2);
Review comment:
please add APIMaturity annotations and javadocs.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConcurrentFileWritesConflictResolutionStrategy.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConcurrentFileWritesConflictResolutionStrategy
+ implements ConflictResolutionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(ConcurrentFileWritesConflictResolutionStrategy.class);
+
+ @Override
+ public boolean hasConflict(HoodieCommitMetadata firstInstant,
HoodieCommitMetadata secondInstant) {
+ // TODO : Ensure file ids are only the UUID and not the full name, also
cann UUID's clash then for insert/insert ?
+ Set<String> fileIdsSetForFirstInstant =
firstInstant.getFileIdWithoutSuffixAndRelativePaths().keySet();
+ Set<String> fileIdsSetForSecondInstant =
secondInstant.getFileIdWithoutSuffixAndRelativePaths().keySet();
+ Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
+ intersection.retainAll(fileIdsSetForSecondInstant);
+ if (!intersection.isEmpty()) {
+ LOG.error("Found conflicting writes " + intersection);
Review comment:
more information on these logs
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/FileSystemBasedLockProvider.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+
+import java.io.IOException;
+
+import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+/**
+ * This lock provider is used to testing purposes only. It provides a simple
file system based lock using HDFS atomic
+ * create operation. This lock does not support cleaning/expiring the lock
after a failed write hence cannot be used
+ * in production environments.
+ */
+public class FileSystemBasedLockProvider extends LockProvider {
+
+ private static final String LOCK_NAME = "acquired";
+
+ private String lockPath;
+ private FileSystem fs;
+
+ public FileSystemBasedLockProvider(LockConfiguration lockConfiguration,
final Configuration configuration) {
+ try {
+ this.lockConfiguration = lockConfiguration;
+ this.lockPath =
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP);
+ this.fs = FileSystem.get(configuration);
Review comment:
please use FSUtils.getFS() always ! to get the filesystem object
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -122,6 +123,10 @@
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED =
"hoodie.merge.data.validation.enabled";
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED =
"false";
+ // Enable multi writer support
+ private static final String HOODIE_TABLE_MULTIWRITER_ENABLED_PROP =
"hoodie.table.multiwriter.enabled";
Review comment:
repeating my earlier comment; lets just use the concurrency mode config
from the other PR .
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/FileSystemBasedLockProvider.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+
+import java.io.IOException;
+
+import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+/**
+ * This lock provider is used to testing purposes only. It provides a simple
file system based lock using HDFS atomic
+ * create operation. This lock does not support cleaning/expiring the lock
after a failed write hence cannot be used
+ * in production environments.
+ */
+public class FileSystemBasedLockProvider extends LockProvider {
+
+ private static final String LOCK_NAME = "acquired";
+
+ private String lockPath;
+ private FileSystem fs;
+
+ public FileSystemBasedLockProvider(LockConfiguration lockConfiguration,
final Configuration configuration) {
+ try {
+ this.lockConfiguration = lockConfiguration;
+ this.lockPath =
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP);
+ this.fs = FileSystem.get(configuration);
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to create file systems", io);
+ }
+ }
+
+ @Override
+ public void acquireLock() {
+ try {
+ fs.create(new Path(lockPath + "/" + LOCK_NAME)).close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to acquire lock", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ fs.close();
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ int numRetries = 0;
+ while (fs.exists(new Path(lockPath + "/" + LOCK_NAME))
+ && (numRetries <=
lockConfiguration.getConfig().getInteger(HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP)))
{
+
Thread.sleep(lockConfiguration.getConfig().getInteger(HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP));
+ }
+ acquireLock();
+ return true;
+ } catch (IOException | InterruptedException e) {
+ throw new HoodieLockException("Failed to acquire lock", e);
+ }
+ }
+
+ @Override
+ public void unlock() {
Review comment:
whats the guarantee that `unlock()` will fail for thread B, if thread A
had actually created the lock file. Don't think this will work. We should
remove this implementation, if we cannot get it right IMO
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/FileSystemBasedLockProvider.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+
+import java.io.IOException;
+
+import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static
org.apache.hudi.common.config.LockConfiguration.HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+/**
+ * This lock provider is used to testing purposes only. It provides a simple
file system based lock using HDFS atomic
+ * create operation. This lock does not support cleaning/expiring the lock
after a failed write hence cannot be used
+ * in production environments.
+ */
+public class FileSystemBasedLockProvider extends LockProvider {
+
+ private static final String LOCK_NAME = "acquired";
+
+ private String lockPath;
+ private FileSystem fs;
+
+ public FileSystemBasedLockProvider(LockConfiguration lockConfiguration,
final Configuration configuration) {
+ try {
+ this.lockConfiguration = lockConfiguration;
+ this.lockPath =
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP);
+ this.fs = FileSystem.get(configuration);
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to create file systems", io);
+ }
+ }
+
+ @Override
+ public void acquireLock() {
+ try {
+ fs.create(new Path(lockPath + "/" + LOCK_NAME)).close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to acquire lock", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ fs.close();
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ int numRetries = 0;
+ while (fs.exists(new Path(lockPath + "/" + LOCK_NAME))
+ && (numRetries <=
lockConfiguration.getConfig().getInteger(HOODIE_LOCK_ACQUIRE_NUM_RETRIES_PROP)))
{
+
Thread.sleep(lockConfiguration.getConfig().getInteger(HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP));
+ }
+ acquireLock();
+ return true;
+ } catch (IOException | InterruptedException e) {
+ throw new HoodieLockException("Failed to acquire lock", e);
+ }
+ }
+
+ @Override
+ public void unlock() {
Review comment:
which is indeed hard. I dont its possible on cloud stores.
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
##########
@@ -284,7 +284,7 @@ private Object typeConvert(Schema.Field field) {
private Object generateFixedType(Schema localSchema) {
// TODO: Need to implement valid data generation for fixed type
GenericFixed genericFixed = new GenericData.Fixed(localSchema);
- switch (localSchema.getLogicalType().getName()) {
+ switch (localSchema.getType().getName()) {
Review comment:
why this change?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider implements Lock, AutoCloseable {
Review comment:
can this be an interface?
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -314,7 +313,7 @@ object AvroConversionHelper {
} else {
val sourceArray = item.asInstanceOf[Seq[Any]]
val sourceArraySize = sourceArray.size
- val targetList = new util.ArrayList[Any](sourceArraySize)
+ val targetList = new java.util.ArrayList[Any](sourceArraySize)
Review comment:
back these changes out?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -216,6 +222,71 @@ public void testDeduplicationOnUpsert() throws Exception {
testDeduplication(SparkRDDWriteClient::upsert);
}
+ @Test
+ public void testMultiWriter() throws Exception {
Review comment:
lets add a new test around the multi writing please. it should ideally
be agnostic of COW/MOR. and we can just test MOR.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import java.util.Properties;
+
+/**
+ * Configuration for managing locks. Since this configuration needs to be
shared with HiveMetaStore based lock,
+ * which is in a different package than other lock providers, we use this as a
data transfer object in hoodie-common
+ */
+public class LockConfiguration {
Review comment:
whats the reason to have this in `hudi-common`? locking is only used by
writing correct.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -385,6 +386,10 @@ protected void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
+
latestWriteInstantCompletedBeforeWriter.set(table.getMetaClient().getActiveTimeline().getAllCommitsTimeline()
Review comment:
so, I was expecting we will do some kind of atomic swap, not just a set.
if you just want to set, even a volatile variable is fine right?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import java.util.Properties;
+
+/**
+ * Configuration for managing locks. Since this configuration needs to be
shared with HiveMetaStore based lock,
+ * which is in a different package than other lock providers, we use this as a
data transfer object in hoodie-common
+ */
+public class LockConfiguration {
+
+ public static final String HOODIE_LOCK_PREFIX = "hoodie.writer.lock.";
+ public static final String
HOODIE_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = HOODIE_LOCK_PREFIX +
"wait_time_ms";
Review comment:
Drop the `HOODIE_` ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -48,6 +50,7 @@
protected final transient Configuration hadoopConf;
protected final HoodieWriteConfig config;
protected final String basePath;
+ protected AtomicReference<Option<HoodieInstant>>
latestWriteInstantCompletedBeforeWriter = new AtomicReference<>();
Review comment:
Since this is specific to writing, can we put this in the
AbstractHoodieWriteClient subclass?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
##########
@@ -109,6 +108,17 @@ public void setCompacted(Boolean compacted) {
return filePaths;
}
+ public HashMap<String, String> getFileIdWithoutSuffixAndRelativePaths() {
Review comment:
can this sit somewhere else? in a helper method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]