This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b181063ddd1 [HUDI-7040] Handle dropping of partition columns in
BulkInsertDataInternalWriterHelper::write(...) (#10272)
b181063ddd1 is described below
commit b181063ddd13583ee2c6fa862bb476f3a7cbf1ab
Author: bhat-vinay <[email protected]>
AuthorDate: Mon Dec 11 22:08:30 2023 +0530
[HUDI-7040] Handle dropping of partition columns in
BulkInsertDataInternalWriterHelper::write(...) (#10272)
Issue:
There are two configs which when set in a certain manner throws exceptions
or asserts
1. Configs to disable populating metadata fields (for each row)
2. Configs to drop partition columns (to save storage space) from a row
With #1 and #2, partition paths cannot be deduced using partition columns
(as the partition columns are dropped higher up the stack.
BulkInsertDataInternalWriterHelper::write(...) relied on metadata fields to
extract partition path in such cases.
But with #1 it is not possible resulting in asserts/exceptions.
The fix is to push down the dropping of partition columns down the stack
after partition path is computed.
The fix manipulates the raw 'InternalRow' row structure by only copying the
relevent fields into a new 'InternalRow' structure.
Each row is processed individually to drop the partition columns and copy
it a to new 'InternalRow'
Co-authored-by: Vinaykumar Bhat <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../commit/BulkInsertDataInternalWriterHelper.java | 34 ++++++++++++++-
.../hudi/HoodieDatasetBulkInsertHelper.scala | 31 +++++---------
.../BaseDatasetBulkInsertCommitActionExecutor.java | 3 +-
.../TestHoodieDatasetBulkInsertHelper.java | 12 +++---
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 48 +++++++++++++++++++++-
6 files changed, 101 insertions(+), 31 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 51895751101..7a2f2427af8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1390,6 +1390,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE);
}
+ public boolean shouldDropPartitionColumns() {
+ return getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS);
+ }
+
public String getWriteStatusClassName() {
return getString(WRITE_STATUS_CLASS_NAME);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 7f6054b2296..0773e8a5a0a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
@@ -38,11 +39,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
/**
* Helper class for HoodieBulkInsertDataInternalWriter used by Spark
datasource v2.
*/
@@ -124,7 +130,33 @@ public class BulkInsertDataInternalWriterHelper {
lastKnownPartitionPath = partitionPath.clone();
}
- handle.write(row);
+ boolean shouldDropPartitionColumns =
writeConfig.shouldDropPartitionColumns();
+ if (shouldDropPartitionColumns) {
+ // Drop the partition columns from the row
+ // Using the deprecated JavaConversions to be compatible with scala
versions < 2.12. Once hudi support for scala versions < 2.12 is
+ // stopped, can move this to JavaConverters.seqAsJavaList(...)
+ List<String> partitionCols =
JavaConversions.<String>seqAsJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
+ Set<Integer> partitionIdx = new HashSet<Integer>();
+ for (String col : partitionCols) {
+ partitionIdx.add(this.structType.fieldIndex(col));
+ }
+
+ // Relies on InternalRow::toSeq(...) preserving the column ordering
based on the supplied schema
+ // Using the deprecated JavaConversions to be compatible with scala
versions < 2.12.
+ List<Object> cols =
JavaConversions.<Object>seqAsJavaList(row.toSeq(structType));
+ int idx = 0;
+ List<Object> newCols = new ArrayList<Object>();
+ for (Object o : cols) {
+ if (!partitionIdx.contains(idx)) {
+ newCols.add(o);
+ }
+ idx += 1;
+ }
+ InternalRow newRow =
InternalRow.fromSeq(JavaConverters.<Object>asScalaIteratorConverter(newCols.iterator()).asScala().toSeq());
+ handle.write(newRow);
+ } else {
+ handle.write(row);
+ }
} catch (Throwable t) {
LOG.error("Global error thrown while trying to write records in
HoodieRowCreateHandle ", t);
throw t;
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 12e446d7be6..75ec069946d 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -62,7 +62,6 @@ object HoodieDatasetBulkInsertHelper
def prepareForBulkInsert(df: DataFrame,
config: HoodieWriteConfig,
partitioner: BulkInsertPartitioner[Dataset[Row]],
- shouldDropPartitionColumns: Boolean,
instantTime: String): Dataset[Row] = {
val populateMetaFields = config.populateMetaFields()
val schema = df.schema
@@ -128,16 +127,10 @@ object HoodieDatasetBulkInsertHelper
HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
}
- val trimmedDF = if (shouldDropPartitionColumns) {
- dropPartitionColumns(updatedDF, config)
- } else {
- updatedDF
- }
-
val targetParallelism =
- deduceShuffleParallelism(trimmedDF,
config.getBulkInsertShuffleParallelism)
+ deduceShuffleParallelism(updatedDF,
config.getBulkInsertShuffleParallelism)
- partitioner.repartitionRecords(trimmedDF, targetParallelism)
+ partitioner.repartitionRecords(updatedDF, targetParallelism)
}
/**
@@ -243,21 +236,17 @@ object HoodieDatasetBulkInsertHelper
}
}
- private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig):
DataFrame = {
- val partitionPathFields = getPartitionPathFields(config).toSet
- val nestedPartitionPathFields = partitionPathFields.filter(f =>
f.contains('.'))
- if (nestedPartitionPathFields.nonEmpty) {
- logWarning(s"Can not drop nested partition path fields:
$nestedPartitionPathFields")
- }
-
- val partitionPathCols = (partitionPathFields --
nestedPartitionPathFields).toSeq
-
- df.drop(partitionPathCols: _*)
- }
-
private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] =
{
val keyGeneratorClassName =
config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)
val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
keyGenerator.getPartitionPathFields.asScala
}
+
+ def getPartitionPathCols(config: HoodieWriteConfig): Seq[String] = {
+ val partitionPathFields = getPartitionPathFields(config).toSet
+ val nestedPartitionPathFields = partitionPathFields.filter(f =>
f.contains('.'))
+
+ return (partitionPathFields -- nestedPartitionPathFields).toSeq
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index fb0218137d2..1e20e4ab663 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -95,8 +95,7 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
table = writeClient.initTable(getWriteOperationType(),
Option.ofNullable(instantTime));
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows =
getPartitioner(populateMetaFields, isTablePartitioned);
- boolean shouldDropPartitionColumns =
writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS());
- Dataset<Row> hoodieDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig,
bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime);
+ Dataset<Row> hoodieDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig,
bulkInsertPartitionerRows, instantTime);
preExecute();
HoodieWriteMetadata<JavaRDD<WriteStatus>> result =
buildHoodieWriteMetadata(doExecute(hoodieDF,
bulkInsertPartitionerRows.arePartitionRecordsSorted()));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index 52c45686978..50ec641c182 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -128,7 +128,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false, "0000000001");
+ new NonSortPartitionerWithRows(), "0000000001");
StructType resultSchema = result.schema();
assertEquals(result.count(), 10);
@@ -172,7 +172,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
.build();
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false, "000001111");
+ new NonSortPartitionerWithRows(), "000001111");
StructType resultSchema = result.schema();
assertEquals(result.count(), 10);
@@ -209,7 +209,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
rows.addAll(updates);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false, "000001111");
+ new NonSortPartitionerWithRows(), "000001111");
StructType resultSchema = result.schema();
assertEquals(result.count(), enablePreCombine ? 10 : 15);
@@ -313,7 +313,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
try {
Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false, "000001111");
+ new NonSortPartitionerWithRows(), "000001111");
preparedDF.count();
fail("Should have thrown exception");
} catch (Exception e) {
@@ -325,7 +325,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
dataset = sqlContext.createDataFrame(rows, structType);
try {
Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false, "000001111");
+ new NonSortPartitionerWithRows(), "000001111");
preparedDF.count();
fail("Should have thrown exception");
} catch (Exception e) {
@@ -337,7 +337,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
dataset = sqlContext.createDataFrame(rows, structType);
try {
Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false, "000001111");
+ new NonSortPartitionerWithRows(), "000001111");
preparedDF.count();
fail("Should have thrown exception");
} catch (Exception e) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 865ca147eb0..38221cc05c7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertNull, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
@@ -365,6 +365,52 @@ class TestHoodieSparkSqlWriter {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields)
}
+@Test
+def testBulkInsertForDropPartitionColumn(): Unit = {
+ //create a new table
+ val tableName = "trips_table"
+ val basePath = "file:///tmp/trips_table"
+ val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city")
+ val data =
+ Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A",
"driver-K", 19.10, "san_francisco"),
+ (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C",
"driver-M", 27.70, "san_francisco"),
+ (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D",
"driver-L", 33.90, "san_francisco"),
+ (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F",
"driver-P", 34.15, "sao_paulo"),
+ (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J",
"driver-T", 17.85, "chennai"));
+
+ var inserts = spark.createDataFrame(data).toDF(columns: _*)
+ inserts.write.format("hudi").
+ option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city").
+ option(HoodieWriteConfig.TABLE_NAME, tableName).
+ option("hoodie.datasource.write.recordkey.field", "uuid").
+ option("hoodie.datasource.write.precombine.field", "rider").
+ option("hoodie.datasource.write.operation", "bulk_insert").
+ option("hoodie.datasource.write.hive_style_partitioning", "true").
+ option("hoodie.populate.meta.fields", "false").
+ option("hoodie.datasource.write.drop.partition.columns", "true").
+ mode(SaveMode.Overwrite).
+ save(basePath)
+
+ // Ensure the partition column (i.e 'city') can be read back
+ val tripsDF = spark.read.format("hudi").load(basePath)
+ tripsDF.show()
+ tripsDF.select("city").foreach(row => {
+ assertNotNull(row)
+ })
+
+ // Peek into the raw parquet file and ensure partition column is not written
to the file
+ val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo")
+ val partitionPaths = new Array[String](3)
+ for (i <- partitionPaths.indices) {
+ partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i))
+ }
+ val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0),
partitionPaths(1), partitionPaths(2))
+ rawFileDf.show()
+ rawFileDf.select("city").foreach(row => {
+ assertNull(row.get(0))
+ })
+}
+
/**
* Test case for disable and enable meta fields.
*/