This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fa751873d841ea8823d96bb43dd2601342f79c3b Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon Mar 17 14:39:20 2025 -0700 [HUDI-9127] Fixing completion time generation to honor timezone from table config (#12926) Co-authored-by: Y Ethan Guo <[email protected]> --- .../hudi/client/TestSparkRDDWriteClient.java | 36 ++++++++++++++++++++++ .../hudi/common/table/HoodieTableMetaClient.java | 3 ++ .../table/timeline/HoodieInstantTimeGenerator.java | 2 +- .../timeline/versioning/v2/ActiveTimelineV2.java | 4 +-- .../common/table/TestHoodieTableMetaClient.java | 34 ++++++++++++++++++++ 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java index 4bc7723c69d..6177a133a8b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java @@ -24,7 +24,9 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.InstantComparison; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; @@ -35,6 +37,7 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; @@ -190,4 +193,37 @@ class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness { } } } + + @Test + public void testCompletionTimeGreaterThanRequestedTime() throws IOException { + String basePath = URI.create(basePath()).getPath(); + testAndAssertCompletionIsEarlierThanRequested(basePath, new Properties()); + + // retry w/ explicitly setting timezone to UTC + basePath = basePath + "_UTC"; + Properties props = new Properties(); + props.setProperty(HoodieTableConfig.TIMELINE_TIMEZONE.key(), "UTC"); + testAndAssertCompletionIsEarlierThanRequested(basePath, props); + } + + private void testAndAssertCompletionIsEarlierThanRequested(String basePath, Properties properties) throws IOException { + HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), basePath, properties); + + HoodieWriteConfig cfg = getConfigBuilder(true).build(); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + for (int i = 0; i < 10; i++) { + String requestedTime = client.startCommit(); + List<HoodieRecord> records = dataGen.generateInserts(requestedTime, 200); + JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1); + client.upsert(writeRecords, requestedTime); + } + } + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(context().getStorageConf()).build(); + metaClient.reloadActiveTimeline(); + metaClient.getActiveTimeline().filterCompletedInstants().getInstants().forEach(hoodieInstant -> { + assertTrue(InstantComparison.compareTimestamps(hoodieInstant.requestedTime(), InstantComparison.LESSER_THAN, hoodieInstant.getCompletionTime())); + }); + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index d50e9f4b7cd..a55834b20a3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -1331,6 +1331,9 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)) { setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)); } + if (hoodieConfig.contains(HoodieTableConfig.TIMELINE_TIMEZONE)) { + setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))); + } if (hoodieConfig.contains(HoodieTableConfig.CDC_ENABLED)) { setCDCEnabled(hoodieConfig.getBoolean(HoodieTableConfig.CDC_ENABLED)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index f1fa03095d9..708b993a138 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -48,7 +48,7 @@ public class HoodieInstantTimeGenerator { public static final int MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length(); // Formatter to generate Instant timestamps // Unfortunately millisecond format is not parsable as is https://bugs.openjdk.java.net/browse/JDK-8031085. hence have to do appendValue() - private static final DateTimeFormatter MILLIS_INSTANT_TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern(SECS_INSTANT_TIMESTAMP_FORMAT) + public static final DateTimeFormatter MILLIS_INSTANT_TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern(SECS_INSTANT_TIMESTAMP_FORMAT) .appendValue(ChronoField.MILLI_OF_SECOND, 3).toFormatter(); private static final String MILLIS_GRANULARITY_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; private static final DateTimeFormatter MILLIS_GRANULARITY_DATE_FORMATTER = DateTimeFormatter.ofPattern(MILLIS_GRANULARITY_DATE_FORMAT); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java index c44a6b1fd28..dd5c42cddc9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java @@ -33,7 +33,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstantReader; -import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.InstantFileNameGenerator; import org.apache.hudi.common.table.timeline.TimeGenerator; @@ -57,7 +56,6 @@ import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Date; import java.util.HashSet; import java.util.Objects; import java.util.Set; @@ -703,7 +701,7 @@ public class ActiveTimelineV2 extends BaseTimelineV2 implements HoodieActiveTime TimeGenerator timeGenerator = TimeGenerators .getTimeGenerator(metaClient.getTimeGeneratorConfig(), metaClient.getStorageConf()); timeGenerator.consumeTime(!shouldLock, currentTimeMillis -> { - String completionTime = HoodieInstantTimeGenerator.formatDate(new Date(currentTimeMillis)); + String completionTime = TimelineUtils.generateInstantTime(false, timeGenerator); String fileName = instantFileNameGenerator.getFileName(completionTime, instant); StoragePath fullPath = getInstantFileNamePath(fileName); if (metaClient.getTimelineLayoutVersion().isNullVersion()) { diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index 3df19ac4e9d..90905f4b6fe 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieIndexMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -40,10 +41,13 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_TIME_FORMATTER; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -124,6 +128,36 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { assertEquals("val", metaClient.getActiveTimeline().readCommitMetadata(completedInstant).getExtraMetadata().get("key")); } + @Test + public void testCreateNewInstantTimes() throws IOException { + List<String> instantTimesSoFar = new ArrayList<>(); + // explicitly set timezone to UTC and generate timestamps + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.TIMELINE_TIMEZONE.key(), "UTC"); + metaClient = HoodieTestUtils.init(metaClient.getStorageConf(), basePath, HoodieTableType.MERGE_ON_READ, properties); + + // run for few iterations + for (int j = 0; j < 5; j++) { + instantTimesSoFar.clear(); + // Generate an instant time in UTC and validate that all instants generated using metaClient are within few seconds apart. + String newCommitTimeInUTC = getNewInstantTimeInUTC(); + + // new instant that we generate below should be within few seconds apart compared to above time we generated. If not, the time zone is not honored + for (int i = 0; i < 10; i++) { + String newInstantTime = metaClient.createNewInstantTime(false); + assertTrue(!instantTimesSoFar.contains(newInstantTime)); + instantTimesSoFar.add(newInstantTime); + assertTrue((Long.parseLong(newInstantTime) - Long.parseLong(newCommitTimeInUTC)) < 10000L); + } + } + } + + private String getNewInstantTimeInUTC() { + Date d = new Date(System.currentTimeMillis()); + return d.toInstant().atZone(HoodieTimelineTimeZone.UTC.getZoneId()) + .toLocalDateTime().format(MILLIS_INSTANT_TIME_FORMATTER); + } + @Test public void testEquals() throws IOException { HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), getTableType());
