This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec6bcf336f [HUDI-8235] Adding support for EPOCHMICROSECONDS in
TimestampBasedAvroKeyGenerator (#7913)
5ec6bcf336f is described below
commit 5ec6bcf336ff62c923b706cec03611e1fde188f4
Author: Sydney Beal <[email protected]>
AuthorDate: Sun Sep 22 18:40:56 2024 -0500
[HUDI-8235] Adding support for EPOCHMICROSECONDS in
TimestampBasedAvroKeyGenerator (#7913)
Co-authored-by: Sydney Beal <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../keygen/TimestampBasedAvroKeyGenerator.java | 6 +++++-
.../common/config/TimestampKeyGeneratorConfig.java | 3 ++-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 6 ++++--
.../keygen/TestTimestampBasedKeyGenerator.java | 25 ++++++++++++++++++++++
4 files changed, 36 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index 1990b2dab44..ea2e0911d30 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -41,6 +41,7 @@ import java.time.LocalDate;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
@@ -54,7 +55,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
*/
public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
public enum TimestampType implements Serializable {
- UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
+ UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, EPOCHMICROSECONDS,
SCALAR
}
private final TimeUnit timeUnit;
@@ -93,6 +94,9 @@ public class TimestampBasedAvroKeyGenerator extends
SimpleAvroKeyGenerator {
case EPOCHMILLISECONDS:
timeUnit = MILLISECONDS;
break;
+ case EPOCHMICROSECONDS:
+ timeUnit = MICROSECONDS;
+ break;
case UNIX_TIMESTAMP:
timeUnit = SECONDS;
break;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java
index 46b66371b31..367b7593ca6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java
@@ -42,7 +42,8 @@ public class TimestampKeyGeneratorConfig extends HoodieConfig
{
.withAlternatives(OLD_TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.type")
.markAdvanced()
.withDocumentation("Timestamp type of the field, which should be one of
the timestamp types "
- + "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`,
`EPOCHMILLISECONDS`, `SCALAR`.");
+ + "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`,
`EPOCHMILLISECONDS`,"
+ + " `EPOCHMICROSECONDS`, `SCALAR`.");
public static final ConfigProperty<String> INPUT_TIME_UNIT = ConfigProperty
.key(TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.scalar.time.unit")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c6d24974b07..e394c030df0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -371,8 +371,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
val tableConfig = metaClient.getTableConfig
if (null != tableConfig.getKeyGeneratorClassName
&&
tableConfig.getKeyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
- &&
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()).matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS"))
{
- // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP or
EPOCHMILLISECONDS,
+ &&
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+ .matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS|EPOCHMICROSECONDS")) {
+ // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP,
+ // EPOCHMILLISECONDS, or EPOCHMICROSECONDS,
// we couldn't reconstruct initial partition column values from
partition paths due to lost data after formatting in most cases.
// But the output for these cases is in a string format, so we can pass
partitionPath as UTF8String
Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index ec93ea229ac..d5618fbe8ab 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -209,6 +209,31 @@ public class TestTimestampBasedKeyGenerator {
assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals(UTF8String.fromString("1970-01-01 12:00:00"),
keyGen.getPartitionPath(internalRow, baseRow.schema()));
+
+ // Timestamp field is in long type, with `EPOCHMICROSECONDS` timestamp
type in the key generator
+ baseRecord.put("createTime", 1578283932123456L);
+ properties = getBaseKeyConfig("createTime", "EPOCHMICROSECONDS",
"yyyy-MM-dd hh", "GMT+8:00", null);
+ keyGen = new TimestampBasedKeyGenerator(properties);
+ HoodieKey key = keyGen.getKey(baseRecord);
+ assertEquals("2020-01-06 12", key.getPartitionPath());
+ baseRow = genericRecordToRow(baseRecord);
+ assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
+ internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
+ assertEquals(UTF8String.fromString("2020-01-06 12"),
keyGen.getPartitionPath(internalRow, baseRow.schema()));
+
+ // Timestamp field is in decimal type, with `EPOCHMICROSECONDS` timestamp
type in the key generator
+ decimal = new BigDecimal("1578283932123456.0001");
+ resolvedNullableSchema = AvroSchemaUtils.resolveNullableSchema(
+ schema.getField("createTimeDecimal").schema());
+ avroDecimal = conversion.toFixed(decimal, resolvedNullableSchema,
LogicalTypes.decimal(20, 4));
+ baseRecord.put("createTimeDecimal", avroDecimal);
+ properties = getBaseKeyConfig(
+ "createTimeDecimal", "EPOCHMICROSECONDS", "yyyy-MM-dd hh", "GMT+8:00",
null);
+ keyGen = new TimestampBasedKeyGenerator(properties);
+ bigDecimalKey = keyGen.getKey(baseRecord);
+ assertEquals("2020-01-06 12", bigDecimalKey.getPartitionPath());
+ baseRow = genericRecordToRow(baseRecord);
+ assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
}
@Test