rafcis02 commented on issue #4552:
URL: https://github.com/apache/hudi/issues/4552#issuecomment-1024142956
I've tried it for BULK_INSERT and UPSERT as well, bot nothing works for me.
I prepared sample test job of that so you can reproduce it or just review it
(I hope I just misconfigured it :grinning:).
Upsert operation corrupts timestamps, no matter if I set this option or not
- I tested it using DataFrame writer and SQL MERGE INTO.
AWS Glue 2.0(Spark 2.4)
Hudi 0.10.1
```scala
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions.{current_timestamp, expr, lit}
import
org.apache.spark.sql.hudi.HoodieOptionConfig.{SQL_KEY_PRECOMBINE_FIELD,
SQL_KEY_TABLE_PRIMARY_KEY, SQL_KEY_TABLE_TYPE}
import org.apache.spark.sql.hudi.{HoodieOptionConfig,
HoodieSparkSessionExtension}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.JavaConverters.mapAsJavaMapConverter
case class HudiTableOptions(tableName: String,
databaseName: String,
operationType: String,
partitionColumns: List[String],
recordKeyColumns: List[String],
preCombineColumn: String) {
def hudiTableOptions = Map(
TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
OPERATION.key() -> operationType,
TBL_NAME.key() -> tableName,
RECORDKEY_FIELD.key() -> recordKeyColumns.mkString(","),
PARTITIONPATH_FIELD.key() -> partitionColumns.mkString(","),
KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName,
PRECOMBINE_FIELD.key() -> preCombineColumn,
URL_ENCODE_PARTITIONING.key() -> false.toString,
KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key() -> true.toString
)
def hiveTableProperties = Map(
SQL_KEY_TABLE_TYPE.sqlKeyName ->
HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW,
SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName ->
hudiTableOptions(RECORDKEY_FIELD.key()),
SQL_KEY_PRECOMBINE_FIELD.sqlKeyName ->
hudiTableOptions(PRECOMBINE_FIELD.key())
)
def hiveTableOptions = Map(
HIVE_SYNC_MODE.key() -> HiveSyncMode.HMS.name(),
HIVE_SYNC_ENABLED.key() -> true.toString,
HIVE_DATABASE.key() -> databaseName,
HIVE_TABLE.key() -> hudiTableOptions(TBL_NAME.key()),
HIVE_PARTITION_FIELDS.key() ->
hudiTableOptions(PARTITIONPATH_FIELD.key()),
HIVE_PARTITION_EXTRACTOR_CLASS.key() ->
classOf[MultiPartKeysValueExtractor].getName,
HIVE_STYLE_PARTITIONING.key() -> true.toString,
HIVE_SUPPORT_TIMESTAMP_TYPE.key() -> true.toString,
HIVE_TABLE_SERDE_PROPERTIES.key() ->
ConfigUtils.configToString(hiveTableProperties.asJava)
)
def writerOptions: Map[String, String] = hudiTableOptions ++
hiveTableOptions
}
object GlueApp {
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.apache.hudi").setLevel(Level.INFO)
val spark: SparkSession = SparkSession.builder()
.appName("Test")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("hive.metastore.glue.catalogid", "****")
.withExtensions(new HoodieSparkSessionExtension())
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val tableName = "hudi_test"
val databaseName = "test_database"
val path = "s3://s3bucketname/hudi-test/"
val tableOptions = HudiTableOptions(tableName, databaseName,
BULK_INSERT_OPERATION_OPT_VAL, List("year"), List("id"), "ts")
val dataFrameForBulkInsert = Range(0, 10000).toDF("id")
.withColumn("year", lit(2022))
.withColumn("ts", current_timestamp() - expr("INTERVAL 100 DAYS"))
.withColumn("other", current_timestamp() - expr("INTERVAL 150 DAYS"))
val dataFrameForUpsert = Range(5000, 15000).toDF("id")
.withColumn("year", lit(2022))
.withColumn("ts", current_timestamp() - expr("INTERVAL 10 DAYS"))
.withColumn("other", current_timestamp() - expr("INTERVAL 50 DAYS"))
// ------------------------ BULK INSERT
------------------------------------------
dataFrameForBulkInsert.write
.format("org.apache.hudi")
.options(tableOptions.writerOptions)
.mode(SaveMode.Overwrite)
.save(path)
//
-----------------------------------------------------------------------------
Thread.sleep(10 * 1000)
// --------------------- UPSERT Spark DataFrame Writer
--------------------------
// dataFrameForUpsert
// .write
// .format("org.apache.hudi")
// .options(tableOptions.copy(operationType =
UPSERT_OPERATION_OPT_VAL).writerOptions)
// .mode(SaveMode.Append)
// .save(path)
//
------------------------------------------------------------------------------
// --------------------- UPSERT SQL MERGE
INTO-----------------------------------
val mergeIntoStatement =
s"""MERGE INTO $databaseName.$tableName AS t
| USING source_data_set s
| ON t.id=s.id
| WHEN MATCHED THEN UPDATE SET *
| WHEN NOT MATCHED THEN INSERT *
|""".stripMargin
dataFrameForUpsert.createTempView("source_data_set")
spark.sql(mergeIntoStatement)
//
-------------------------------------------------------------------------------
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]