This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 342acbee33b0f76f7b3a3b328e2e988cc5fdef8f
Author: Yann Byron <[email protected]>
AuthorDate: Sat Dec 4 00:13:38 2021 +0800

    Revert "[HUDI-2495] Resolve inconsistent key generation for timestamp types 
 by GenericRecord and Row (#3944)" (#4201)
    
    (cherry picked from commit 2f96f4300b37207703b477979f2461bdd294ccf9)
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   3 -
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |   3 +-
 .../hudi/TestGenericRecordAndRowConsistency.scala  | 104 ---------------------
 3 files changed, 1 insertion(+), 109 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ff8aefe..fdaa466 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -59,7 +59,6 @@ import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -542,8 +541,6 @@ public class HoodieAvroUtils {
   private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, 
Object fieldValue) {
     if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
       return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
-    } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros()) 
{
-      return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000);
     } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
       Decimal dc = (Decimal) fieldSchema.getLogicalType();
       DecimalConversion decimalConversion = new DecimalConversion();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 2cc3ece..e069df9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.hudi.command
 
-import java.sql.Timestamp
 import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
 
 import org.apache.avro.generic.GenericRecord
@@ -97,7 +96,7 @@ class SqlKeyGenerator(props: TypedProperties) extends 
ComplexKeyGenerator(props)
                 val timeMs = if (rowType) { // In RowType, the 
partitionPathValue is the time format string, convert to millis
                   
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
                 } else {
-                  Timestamp.valueOf(_partitionValue).getTime
+                  MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
                 }
                 val timestampFormat = PartitionPathEncodeUtils.escapePathName(
                     SqlKeyGenerator.timestampTimeFormat.print(timeMs))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
deleted file mode 100644
index 985bf2e..0000000
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
-import org.apache.spark.sql.DataFrame
-import org.junit.jupiter.api.Test
-
-import java.sql.{Date, Timestamp}
-
-class TestGenericRecordAndRowConsistency extends 
SparkClientFunctionalTestHarness {
-
-  val commonOpts = Map(
-    HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl",
-    "hoodie.insert.shuffle.parallelism" -> "1",
-    "hoodie.upsert.shuffle.parallelism" -> "1",
-    DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
-    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "str,eventTime",
-    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "typeId",
-    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "typeId",
-    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.ComplexKeyGenerator"
-  )
-
-  @Test
-  def testTimestampTypeConsistency(): Unit = {
-    val _spark = spark
-    import _spark.implicits._
-
-    val df = Seq(
-      (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
-      (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
-      (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
-      (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
-    ).toDF("typeId", "eventTime", "str")
-
-    testConsistencyBetweenGenericRecordAndRow(df)
-  }
-
-  @Test
-  def testDateTypeConsistency(): Unit = {
-    val _spark = spark
-    import _spark.implicits._
-
-    val df = Seq(
-      (1, Date.valueOf("2014-01-01"), "abc"),
-      (1, Date.valueOf("2014-11-30"), "abc"),
-      (2, Date.valueOf("2016-12-29"), "def"),
-      (2, Date.valueOf("2016-05-09"), "def")
-    ).toDF("typeId", "eventTime", "str")
-
-    testConsistencyBetweenGenericRecordAndRow(df)
-  }
-
-  private def testConsistencyBetweenGenericRecordAndRow(df: DataFrame): Unit = 
{
-    val _spark = spark
-    import _spark.implicits._
-
-    // upsert operation generate recordKey by GenericRecord
-    val tempRecordPath = basePath + "/record_tbl/"
-    df.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION.key, "upsert")
-      .mode(org.apache.spark.sql.SaveMode.Overwrite)
-      .save(tempRecordPath)
-
-    val data1 = spark.read.format("hudi")
-      .load(tempRecordPath)
-      .select("_hoodie_record_key")
-      .map(_.toString()).collect().sorted
-
-    // bulk_insert operation generate recordKey by Row
-    val tempRowPath = basePath + "/row_tbl/"
-    df.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION.key, "bulk_insert")
-      .mode(org.apache.spark.sql.SaveMode.Overwrite)
-      .save(tempRowPath)
-
-    val data2 = spark.read.format("hudi")
-      .load(tempRowPath)
-      .select("_hoodie_record_key")
-      .map(_.toString()).collect().sorted
-
-    assert(data1 sameElements data2)
-  }
-
-}

Reply via email to