This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b44e19cc2fd [HUDI-8930] Fixing concurrency handling during upgrade  
(#12737)
b44e19cc2fd is described below

commit b44e19cc2fda5ec3c4eeea649399677a187c4fa8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jan 30 00:32:25 2025 -0800

    [HUDI-8930] Fixing concurrency handling during upgrade  (#12737)
    
    * minor fixes to upgrade path
    
    * Fixes for concurrency handling during upgrade
    
    * fix build failure
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   2 +-
 .../hudi/table/upgrade/UpgradeDowngradeUtils.java  |  15 ++-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |  37 ++++++
 .../client/transaction/lock/NoopLockProvider.java  |  84 ++++++++++++
 .../common/config/HoodieTimeGeneratorConfig.java   |  18 +++
 .../hudi/common/table/timeline/TimeGenerators.java |  22 +++-
 .../transaction/lock/TestNoopLockProvider.java     | 145 +++++++++++++++++++++
 .../table/timeline/TestWaitBasedTimeGenerator.java |  29 +++++
 .../hudi/functional/TestSevenToEightUpgrade.scala  |  23 +++-
 9 files changed, 360 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 85a3a1f6bc0..ddc0aa8bc46 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3496,7 +3496,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
     private void autoAdjustConfigsForConcurrencyMode(boolean 
isLockProviderPropertySet) {
       // for a single writer scenario, with all table services inline, lets 
set InProcessLockProvider
-      if (writeConfig.getWriteConcurrencyMode() == 
WriteConcurrencyMode.SINGLE_WRITER && !writeConfig.areAnyTableServicesAsync()) {
+      if (writeConfig.isAutoAdjustLockConfigs() && 
writeConfig.getWriteConcurrencyMode() == WriteConcurrencyMode.SINGLE_WRITER && 
!writeConfig.areAnyTableServicesAsync()) {
         if (writeConfig.getLockProviderClass() != null && 
!writeConfig.getLockProviderClass().equals(InProcessLockProvider.class.getCanonicalName()))
 {
           // add logs only when explicitly overridden by the user.
           LOG.warn(String.format("For a single writer mode, overriding lock 
provider class (%s) to %s. So, user configured lock provider %s may not take 
effect",
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index e5618d69a35..08a0ce4b1d2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -166,9 +167,17 @@ public class UpgradeDowngradeUtils {
       // set required configs for rollback
       
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
       // NOTE: at this stage rollback should use the current writer version 
and disable auto upgrade/downgrade
-      TypedProperties properties = config.getProps();
-      properties.remove(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
-      properties.remove(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key());
+      TypedProperties properties = new TypedProperties();
+      properties.putAll(config.getProps());
+      // TimeGeneratos are cached and re-used based on table base path. Since 
here we are changing the lock configurations, avoiding the cache use
+      // for upgrade code block.
+      
properties.put(HoodieTimeGeneratorConfig.TIME_GENERATOR_REUSE_ENABLE.key(),"false");
+      // override w/ NoopLock Provider to avoid re-entrant locking. already 
upgrade is happening within the table level lock.
+      // Belew we do trigger rollback and compaction which might again try to 
acquire the lock. So, here we are explicitly overriding to
+      // NoopLockProvider for just the upgrade code block.
+      
properties.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),"org.apache.hudi.client.transaction.lock.NoopLockProvider");
+      // if auto adjust it not disabled, chances that InProcessLockProvider 
will get overriden for single writer use-cases.
+      properties.put("hoodie.auto.adjust.lock.configs","false");
       HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder()
           .withProps(properties)
           .withWriteTableVersion(tableVersion.versionCode())
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 69a48e2fd7b..cb46cec8242 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -20,6 +20,7 @@ package org.apache.hudi.config;
 
 import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.client.transaction.lock.NoopLockProvider;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
@@ -529,6 +530,42 @@ public class TestHoodieWriteConfig {
         HoodieFailedWritesCleaningPolicy.LAZY, 
FileSystemBasedLockProviderTestClass.class.getName());
   }
 
+  @Test
+  public void testTimeGeneratorConfig() {
+
+    HoodieWriteConfig writeConfig = createWriteConfig(new HashMap<String, 
String>() {
+      {
+        put(HoodieTableConfig.TYPE.key(), 
HoodieTableType.COPY_ON_WRITE.name());
+      }
+    });
+
+    // validate the InProcessLockProvider kicks in if no explicit lock 
provider is set.
+    assertEquals(InProcessLockProvider.class.getName(), 
writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+    writeConfig = createWriteConfig(new HashMap<String, String>() {
+      {
+        put(HoodieTableConfig.TYPE.key(), 
HoodieTableType.COPY_ON_WRITE.name());
+        put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
NoopLockProvider.class.getName());
+      }
+    });
+
+    // validate the the configured lock provider is honored by the 
TimeGeneratorConfig as well.
+    assertEquals(NoopLockProvider.class.getName(), 
writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+    // if auto adjust lock config is enabled, for a single writer w/ all 
inline table services, InProcessLockProvider is overriden
+    writeConfig = createWriteConfig(new HashMap<String, String>() {
+      {
+        put(HoodieTableConfig.TYPE.key(), 
HoodieTableType.COPY_ON_WRITE.name());
+        put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
NoopLockProvider.class.getName());
+        put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
+      }
+    });
+
+    // validate the InProcessLockProvider kicks in due to auto adjust lock 
configs
+    assertEquals(InProcessLockProvider.class.getName(), 
writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+  }
+
   @Test
   public void testConsistentBucketIndexDefaultClusteringConfig() {
     Properties props = new Properties();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
 
b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
new file mode 100644
index 00000000000..65c6aa32b32
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * NoopLockProvider as the name suggests, is a no op lock provider. Any caller 
asking for a lock will be able to get hold of the lock.
+ * This is not meant to be used a producation grade lock providers. This is 
meant to be used for Hudi's internal operations.
+ * For eg: During upgrade, we have nested lock situations and we leverage this 
{@code NoopLockProvider} for any operations we
+ * might want to do within the upgradeHandler blocks to avoid re-entrant 
situations. Not all lock providers might support re-entrancy and during upgrade,
+ * it is expected to have a single writer to the Hudi table of interest.
+ */
+public class NoopLockProvider implements LockProvider<ReentrantReadWriteLock>, 
Serializable {
+
+  public NoopLockProvider(final LockConfiguration lockConfiguration, final 
StorageConfiguration<?> conf) {
+    // no op.
+  }
+
+  @Override
+  public boolean tryLock(long time, @NotNull TimeUnit unit) throws 
InterruptedException {
+    return true;
+  }
+
+  @Override
+  public void unlock() {
+    // no op.
+  }
+
+  @Override
+  public void lockInterruptibly() {
+    // no op.
+  }
+
+  @Override
+  public void lock() {
+    // no op.
+  }
+
+  @Override
+  public boolean tryLock() {
+    return true;
+  }
+
+  @Override
+  public ReentrantReadWriteLock getLock() {
+    return new ReentrantReadWriteLock();
+  }
+
+  @Override
+  public String getCurrentOwnerLockInfo() {
+    return StringUtils.EMPTY_STRING;
+  }
+
+  @Override
+  public void close() {
+    // no op.
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
index d48a9d0d8bc..16ebff9135b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
@@ -56,6 +56,15 @@ public class HoodieTimeGeneratorConfig extends HoodieConfig {
       .withDocumentation("The max expected clock skew time in ms between two 
processes generating time. Used by "
           + TimeGeneratorType.WAIT_TO_ADJUST_SKEW.name() + " time generator to 
implement TrueTime semantics.");
 
+  public static final ConfigProperty<Boolean> TIME_GENERATOR_REUSE_ENABLE = 
ConfigProperty
+      .key("_hoodie.time.generator.reuse.enable")
+      .defaultValue(true)
+      .sinceVersion("1.0.1")
+      .markAdvanced()
+      .withDocumentation("Used only for internal purposes. TimeGeneratos are 
cached per table base path and re-used across invocations. "
+          + "For some internal purposes, we wanted to avoid using the cached 
TimeGenerator (like upgrade flows). Hence this config "
+          + "will be used internally during upgrade flow. No advisable for end 
user to use this config. ");
+
   private HoodieTimeGeneratorConfig() {
     super();
   }
@@ -68,6 +77,10 @@ public class HoodieTimeGeneratorConfig extends HoodieConfig {
     return getLong(MAX_EXPECTED_CLOCK_SKEW_MS);
   }
 
+  public boolean canReuseTimeGenerator() {
+    return getBoolean(TIME_GENERATOR_REUSE_ENABLE);
+  }
+
   public String getBasePath() {
     return getString(BASE_PATH);
   }
@@ -108,6 +121,11 @@ public class HoodieTimeGeneratorConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withReuseTimeGenerator(boolean reuseTimeGenerator) {
+      timeGeneratorConfig.setValue(TIME_GENERATOR_REUSE_ENABLE, 
String.valueOf(reuseTimeGenerator));
+      return this;
+    }
+
     public Builder withPath(String basePath) {
       timeGeneratorConfig.setValue(BASE_PATH, basePath);
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
index 2dce07dc267..b410af6f453 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
@@ -42,12 +42,20 @@ public class TimeGenerators {
                                                StorageConfiguration<?> 
storageConf) {
     ValidationUtils.checkState(timeGeneratorConfig.contains(BASE_PATH), 
"Option [" + BASE_PATH.key() + "] is required");
     ValidationUtils.checkArgument(storageConf != null, "Hadoop configuration 
is required");
-    return TIME_GENERATOR_CACHE.get(timeGeneratorConfig.getBasePath(), s -> {
-      TimeGeneratorType type = timeGeneratorConfig.getTimeGeneratorType();
-      if (Objects.requireNonNull(type) == 
TimeGeneratorType.WAIT_TO_ADJUST_SKEW) {
-        return new SkewAdjustingTimeGenerator(timeGeneratorConfig, 
storageConf);
-      }
-      throw new IllegalArgumentException("Unsupported TimeGenerator Type " + 
type);
-    });
+    if (timeGeneratorConfig.canReuseTimeGenerator()) {
+      return TIME_GENERATOR_CACHE.get(timeGeneratorConfig.getBasePath(), s -> 
getNewTimeGenerator(timeGeneratorConfig, storageConf));
+    } else {
+      return getNewTimeGenerator(timeGeneratorConfig, storageConf);
+    }
+  }
+
+  private static TimeGenerator getNewTimeGenerator(HoodieTimeGeneratorConfig 
timeGeneratorConfig,
+                                                   StorageConfiguration<?> 
storageConf) {
+    // reuse is set to false.
+    TimeGeneratorType type = timeGeneratorConfig.getTimeGeneratorType();
+    if (Objects.requireNonNull(type) == TimeGeneratorType.WAIT_TO_ADJUST_SKEW) 
{
+      return new SkewAdjustingTimeGenerator(timeGeneratorConfig, storageConf);
+    }
+    throw new IllegalArgumentException("Unsupported TimeGenerator Type " + 
type);
   }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
new file mode 100644
index 00000000000..e27da253eb7
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/**
+ * Tests {@code NoopLockProvider}.
+ */
+public class TestNoopLockProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestNoopLockProvider.class);
+  private final StorageConfiguration<?> storageConf = getDefaultStorageConf();
+  private final LockConfiguration lockConfiguration1;
+  private final LockConfiguration lockConfiguration2;
+
+  public TestNoopLockProvider() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(HoodieCommonConfig.BASE_PATH.key(), "table1");
+    lockConfiguration1 = new LockConfiguration(properties);
+    properties.put(HoodieCommonConfig.BASE_PATH.key(), "table2");
+    lockConfiguration2 = new LockConfiguration(properties);
+  }
+
+  @Test
+  public void testLockAcquisition() {
+    NoopLockProvider noopLockProvider = new 
NoopLockProvider(lockConfiguration1, storageConf);
+    assertDoesNotThrow(() -> {
+      noopLockProvider.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider.unlock();
+    });
+  }
+
+  @Test
+  public void testLockReAcquisitionBySameThread() {
+    NoopLockProvider noopLockProvider = new 
NoopLockProvider(lockConfiguration1, storageConf);
+    assertDoesNotThrow(() -> {
+      noopLockProvider.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider.unlock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider.lock();
+    });
+  }
+
+  @Test
+  public void testLockReAcquisitionBySameThreadWithTwoTables() {
+    NoopLockProvider noopLockProvider1 = new 
NoopLockProvider(lockConfiguration1, storageConf);
+    NoopLockProvider noopLockProvider2 = new 
NoopLockProvider(lockConfiguration2, storageConf);
+
+    assertDoesNotThrow(() -> {
+      noopLockProvider1.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider2.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider1.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider1.lock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider1.unlock();
+    });
+    assertDoesNotThrow(() -> {
+      noopLockProvider2.unlock();
+    });
+  }
+
+  @Test
+  public void testLockReAcquisitionByDifferentThread() {
+    NoopLockProvider noopLockProvider = new 
NoopLockProvider(lockConfiguration1, storageConf);
+    final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
+    // Main test thread
+    assertDoesNotThrow(() -> {
+      noopLockProvider.lock();
+    });
+
+    // Another writer thread in parallel, should be able to acquire the lock 
instantly
+    Thread writer2 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        assertDoesNotThrow(() -> {
+          noopLockProvider.lock();
+        });
+        assertDoesNotThrow(() -> {
+          noopLockProvider.unlock();
+        });
+        writer2Completed.set(true);
+      }
+    });
+    writer2.start();
+
+    assertDoesNotThrow(() -> {
+      noopLockProvider.unlock();
+    });
+
+    try {
+      writer2.join();
+    } catch (InterruptedException e) {
+      //
+    }
+    Assertions.assertTrue(writer2Completed.get());
+
+    writer2.interrupt();
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
index f6e92943cdf..d776229e63e 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
@@ -27,6 +27,7 @@ import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -34,6 +35,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
 public class TestWaitBasedTimeGenerator {
 
   public static class MockInProcessLockProvider extends InProcessLockProvider {
@@ -145,4 +149,29 @@ public class TestWaitBasedTimeGenerator {
       Assertions.assertTrue(t2Timestamp.get() < t1Timestamp.get());
     }
   }
+
+  @Test
+  public void testTimeGeneratorCache() {
+    TimeGenerator timeGenerator1 = 
TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf);
+    TimeGenerator timeGenerator2 = 
TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf);
+    TimeGenerator timeGenerator3 = 
TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf);
+
+    assertEquals(timeGenerator1, timeGenerator2);
+    assertEquals(timeGenerator1, timeGenerator3);
+
+    // disable reuse
+    HoodieTimeGeneratorConfig timeGeneratorConfigWithNoReuse = 
HoodieTimeGeneratorConfig.newBuilder()
+        .withPath("test_wait_based")
+        .withMaxExpectedClockSkewMs(25L)
+        .withReuseTimeGenerator(false)
+        .withTimeGeneratorType(TimeGeneratorType.WAIT_TO_ADJUST_SKEW)
+        .build();
+
+    TimeGenerator timeGenerator4 = 
TimeGenerators.getTimeGenerator(timeGeneratorConfigWithNoReuse, storageConf);
+    assertNotEquals(timeGenerator1, timeGenerator4);
+    // how many ever times we call, we should get new time generator
+    TimeGenerator timeGenerator5 = 
TimeGenerators.getTimeGenerator(timeGeneratorConfigWithNoReuse, storageConf);
+    assertNotEquals(timeGenerator4, timeGenerator5);
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 862cd4f9ae9..5779bdeb93f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -22,21 +22,30 @@ import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.common.config.RecordMergeMode
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
+import org.apache.hudi.config.HoodieLockConfig
 import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
 import org.apache.spark.sql.SaveMode
 import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
 
 class TestSevenToEightUpgrade extends RecordLevelIndexTestBase {
 
   @ParameterizedTest
-  @EnumSource(classOf[HoodieTableType])
-  def testPartitionFieldsWithUpgrade(tableType: HoodieTableType): Unit = {
+  @CsvSource(value = Array(
+    "COPY_ON_WRITE,null",
+    
"COPY_ON_WRITE,org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+    "COPY_ON_WRITE,org.apache.hudi.client.transaction.lock.NoopLockProvider",
+    "MERGE_ON_READ,null",
+    
"MERGE_ON_READ,org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+    "MERGE_ON_READ,org.apache.hudi.client.transaction.lock.NoopLockProvider"
+  ))
+  def testPartitionFieldsWithUpgrade(tableType: HoodieTableType, 
lockProviderClass: String): Unit = {
     val partitionFields = "partition:simple"
     // Downgrade handling for metadata not yet ready.
-    val hudiOpts = commonOpts ++ Map(
+    val hudiOptsWithoutLockConfigs = commonOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
KeyGeneratorType.CUSTOM.getClassName,
       DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields,
@@ -45,6 +54,12 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
       DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName,
       DataSourceWriteOptions.RECORD_MERGE_MODE.key -> 
RecordMergeMode.COMMIT_TIME_ORDERING.name)
 
+    val hudiOpts = if (!lockProviderClass.equals("null")) {
+      hudiOptsWithoutLockConfigs ++ 
Map(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> lockProviderClass)
+    } else {
+      hudiOptsWithoutLockConfigs
+    }
+
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite,

Reply via email to