yihua commented on code in PR #5664:
URL: https://github.com/apache/hudi/pull/5664#discussion_r889380010
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -87,6 +89,7 @@ public BulkInsertDataInternalWriterHelper(HoodieTable
hoodieTable, HoodieWriteCo
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.fileIdPrefix = UUID.randomUUID().toString();
+ this.isHiveStylePartitioning =
writeConfig.isHiveStylePartitioningEnabled();
Review Comment:
nit: `writeConfig` is saved inside this helper so we don't need to have
another member variable `isHiveStylePartitioning`?
##########
hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java:
##########
@@ -109,6 +109,48 @@ public void testDataInternalWriter(boolean sorted, boolean
populateMetaFields) t
}
}
+ @Test
+ public void testDataInternalWriterHiveStylePartitioning() throws Exception {
+ boolean sorted = true;
+ boolean populateMetaFields = false;
+ // init config and table
+ HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ for (int i = 0; i < 1; i++) {
+ String instantTime = "00" + i;
+ // init writer
+ HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
+ STRUCT_TYPE, populateMetaFields, sorted);
+
+ int size = 10 + RANDOM.nextInt(1000);
+ // write N rows to partition1, N rows to partition2 and N rows to
partition3 ... Each batch should create a new RowCreateHandle and a new file
+ int batches = 3;
+ Dataset<Row> totalInputRows = null;
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath =
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset<Row> inputRows = getRandomRows(sqlContext, size,
partitionPath, false);
+ writeRows(inputRows, writer);
+ if (totalInputRows == null) {
+ totalInputRows = inputRows;
+ } else {
+ totalInputRows = totalInputRows.union(inputRows);
+ }
+ }
+
+ BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage)
writer.commit();
+ Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
+ Option<List<String>> fileNames = Option.of(new ArrayList<>());
+
+ // verify write statuses
+ assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size,
sorted, fileAbsPaths, fileNames);
+
+ // verify rows
+ Dataset<Row> result =
sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
+ assertOutput(totalInputRows, result, instantTime, fileNames,
populateMetaFields);
Review Comment:
Do we want to validate the hive-style partition path value somewhere?
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -128,7 +133,11 @@ public void write(InternalRow record) throws IOException {
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
partitionPath = "";
} else if (simpleKeyGen) { // SimpleKeyGen
- partitionPath = (record.get(simplePartitionFieldIndex,
simplePartitionFieldDataType)).toString();
+ Object parititionPathValue = record.get(simplePartitionFieldIndex,
simplePartitionFieldDataType);
+ partitionPath = parititionPathValue != null ?
parititionPathValue.toString() :
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
+ if (isHiveStylePartitioning) {
+ partitionPath =
(keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
Review Comment:
For `SimpleKeyGenerator`, there could be only one partition path field. Is
that correct?
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -128,7 +133,11 @@ public void write(InternalRow record) throws IOException {
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
partitionPath = "";
} else if (simpleKeyGen) { // SimpleKeyGen
- partitionPath = (record.get(simplePartitionFieldIndex,
simplePartitionFieldDataType)).toString();
+ Object parititionPathValue = record.get(simplePartitionFieldIndex,
simplePartitionFieldDataType);
+ partitionPath = parititionPathValue != null ?
parititionPathValue.toString() :
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
+ if (isHiveStylePartitioning) {
+ partitionPath =
(keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
Review Comment:
@nsivabalan could you simply leverage
`SimpleKeyGenerator::getPartitionPath(GenericRecord record)` or
`KeyGenUtils::getPartitionPath` API instead of hardcoding the value
construction here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]