yihua commented on code in PR #12772:
URL: https://github.com/apache/hudi/pull/12772#discussion_r2355861073
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java:
##########
@@ -72,7 +72,7 @@ public SparkReaderContextFactory(HoodieSparkEngineContext
hoodieSparkEngineConte
public SparkReaderContextFactory(HoodieSparkEngineContext
hoodieSparkEngineContext, HoodieTableMetaClient metaClient,
TableSchemaResolver resolver, SparkAdapter
sparkAdapter) {
- SQLConf sqlConf =
hoodieSparkEngineContext.getSqlContext().sessionState().conf();
+ SQLConf sqlConf =
hoodieSparkEngineContext.getSqlContext().sparkSession().sessionState().conf();
Review Comment:
Note to myself: this is because of the Spark-side API change where
`SQLContext` does not provide `sessionState()` any more in Spark 4
##########
.github/workflows/bot.yml:
##########
@@ -1132,6 +1189,7 @@ jobs:
- sparkProfile: 'spark3.5'
flinkProfile: 'flink1.20'
sparkArchive: 'spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz'
+
Review Comment:
nit: remove empty line
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java:
##########
@@ -45,8 +47,11 @@ public abstract class RDDBucketIndexPartitioner<T> extends
BucketIndexBulkInsert
public static final Logger LOG =
LogManager.getLogger(RDDBucketIndexPartitioner.class);
+ private final HoodieUTF8StringFactory hoodieUTF8StringFactory;
Review Comment:
```suggestion
private final HoodieUTF8StringFactory utf8StringFactory;
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java:
##########
@@ -45,8 +47,11 @@ public abstract class RDDBucketIndexPartitioner<T> extends
BucketIndexBulkInsert
public static final Logger LOG =
LogManager.getLogger(RDDBucketIndexPartitioner.class);
+ private final HoodieUTF8StringFactory hoodieUTF8StringFactory;
+
public RDDBucketIndexPartitioner(HoodieTable table, String sortString,
boolean preserveHoodieMetadata) {
super(table, sortString, preserveHoodieMetadata);
+ this.hoodieUTF8StringFactory =
SparkAdapterSupport$.MODULE$.sparkAdapter().getHoodieUTF8StringFactory();
Review Comment:
```suggestion
this.hoodieUTF8StringFactory =
SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java:
##########
@@ -63,8 +67,8 @@ public RDDCustomColumnsSortPartitioner(String[] columnNames,
Schema schema, Hood
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
return records
- .sortBy(record -> SortUtils.getComparableSortColumns(record,
sortColumnNames, serializableSchema.get(), suffixRecordKey,
consistentLogicalTimestampEnabled),
- true, outputSparkPartitions);
+ .sortBy(record -> SortUtils.getComparableSortColumns(record,
sortColumnNames, serializableSchema.get(), suffixRecordKey,
consistentLogicalTimestampEnabled,
+ hoodieUTF8StringFactory::wrapArrayOfObjects),true,
outputSparkPartitions);
Review Comment:
```suggestion
hoodieUTF8StringFactory::wrapArrayOfObjects), true,
outputSparkPartitions);
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -368,10 +369,19 @@ private ValueWriter threeLevelArrayWriter(String
repeatedFieldName, String eleme
}
private static class HoodieBloomFilterRowWriteSupport extends
HoodieBloomFilterWriteSupport<UTF8String> {
+
+ private static final HoodieUTF8StringFactory HOODIE_UTF8STRING_FACTORY =
+
SparkAdapterSupport$.MODULE$.sparkAdapter().getHoodieUTF8StringFactory();
Review Comment:
```suggestion
private static final HoodieUTF8StringFactory UTF8STRING_FACTORY =
SparkAdapterSupport$.MODULE$.sparkAdapter().getHoodieUTF8StringFactory();
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -360,7 +360,12 @@ protected Comparable<?> doGetOrderingValue(Schema
recordSchema, Properties props
HoodieInternalRowUtils.getCachedPosList(structType, field);
if (cachedNestedFieldPath.isDefined()) {
NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
- return (Comparable<?>)
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
+ if (nestedFieldPath.parts()[0]._2.dataType() instanceof
org.apache.spark.sql.types.StringType) {
+ return
SparkAdapterSupport$.MODULE$.sparkAdapter().getHoodieUTF8StringFactory()
+ .wrapUTF8String((UTF8String)
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath));
+ } else {
+ return (Comparable<?>)
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
+ }
Review Comment:
nit: this can be put outside `if` branch for clarify
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java:
##########
@@ -44,6 +46,8 @@ public class RDDCustomColumnsSortPartitioner<T>
private final SerializableSchema serializableSchema;
private final boolean consistentLogicalTimestampEnabled;
private final boolean suffixRecordKey;
+ private final HoodieUTF8StringFactory hoodieUTF8StringFactory =
Review Comment:
Rename
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]