This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5faa36f99ca [HUDI-4113] Fix cannot parse <null> schema when use spark
delete sql (#5610)
5faa36f99ca is described below
commit 5faa36f99cac3edbe4000c5d67fbd74b9c74d252
Author: KnightChess <[email protected]>
AuthorDate: Tue Dec 13 23:02:09 2022 +0800
[HUDI-4113] Fix cannot parse <null> schema when use spark delete sql (#5610)
- Alter table drop partition will not add schema to instant.
If using delete sql, will get latest instant to get schema,
which is "". This PR fixes the parsing of null or empty schema.
Co-authored-by: Sagar Sumit <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +-
.../sql/hudi/TestAlterTableDropPartition.scala | 62 +++++++++++++++++++++-
3 files changed, 68 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 45d1b200f7a..cb7eb1147ca 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -63,6 +63,7 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
@@ -1605,7 +1606,8 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
activeTimeline.getInstantDetails(lastInstant.get()).get(),
HoodieCommitMetadata.class);
- if (commitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
+ String extraSchema = commitMetadata.getExtraMetadata().get(SCHEMA_KEY);
+ if (!StringUtils.isNullOrEmpty(extraSchema)) {
config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY));
} else {
throw new HoodieIOException("Latest commit does not have any schema
in commit metadata");
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 06f85fca022..f761090d492 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -284,8 +284,11 @@ object HoodieSparkSqlWriter {
}
// Create a HoodieWriteClient & issue the delete.
+ val tableMetaClient = HoodieTableMetaClient.builder
+
.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
+ val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- null, path, tblName,
+ schemaStr, path, tblName,
mapAsJavaMap(parameters -
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
// Issue delete partitions
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index e063f67d8c0..15b14ec77f5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions
class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
@@ -336,4 +339,61 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
}
}
}
+
+ test("check instance schema") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by (dt)
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, '01'), " +
+ s"(2, 'a2', 10, 1000, '02'), (3, 'a3', 10, 1000, '03')")
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "01"),
+ Seq(2, "a2", 10.0, 1000, "02"),
+ Seq(3, "a3", 10.0, 1000, "03")
+ )
+
+ // drop partition
+ spark.sql(s"alter table $tableName drop partition (dt = '01')")
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(2, "a2", 10.0, 1000, "02"),
+ Seq(3, "a3", 10.0, 1000, "03")
+ )
+
+ // check schema
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val metaClient =
HoodieTableMetaClient.builder().setBasePath(s"${tmp.getCanonicalPath}/$tableName")
+ .setConf(hadoopConf).build()
+ val lastInstant = metaClient.getActiveTimeline.lastInstant()
+ val commitMetadata =
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(
+ lastInstant.get()).get(), classOf[HoodieCommitMetadata])
+ val schemaStr =
commitMetadata.getExtraMetadata.get(HoodieCommitMetadata.SCHEMA_KEY)
+ Assertions.assertFalse(StringUtils.isNullOrEmpty(schemaStr))
+
+ // delete
+ spark.sql(s"delete from $tableName where dt = '02'")
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(3, "a3", 10, 1000, "03")
+ )
+ }
+ }
+ }
}