alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920465935
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##########
@@ -54,42 +63,64 @@
private final SerializableSchema schema;
private final HoodieClusteringConfig.LayoutOptimizationStrategy
layoutOptStrategy;
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType
curveCompositionStrategyType;
+ private final HoodieRecordType recordType;
public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext
sparkEngineContext,
- String[] orderByColumns,
-
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
-
HoodieClusteringConfig.SpatialCurveCompositionStrategyType
curveCompositionStrategyType,
- Schema schema) {
+ String[] orderByColumns,
+ LayoutOptimizationStrategy layoutOptStrategy,
+ SpatialCurveCompositionStrategyType curveCompositionStrategyType,
+ Schema schema, HoodieRecordType recordType) {
this.sparkEngineContext = sparkEngineContext;
this.orderByColumns = orderByColumns;
this.layoutOptStrategy = layoutOptStrategy;
this.curveCompositionStrategyType = curveCompositionStrategyType;
this.schema = new SerializableSchema(schema);
+ this.recordType = recordType;
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputSparkPartitions) {
- JavaRDD<GenericRecord> genericRecordsRDD =
- records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new
Properties()).get());
-
- Dataset<Row> sourceDataset =
- AvroConversionUtils.createDataFrame(
- genericRecordsRDD.rdd(),
- schema.toString(),
- sparkEngineContext.getSqlContext().sparkSession()
- );
-
- Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
-
- return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(),
schema.get().getNamespace(), false, Option.empty())
- .toJavaRDD()
- .map(record -> {
- String key =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String partition =
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
- HoodieKey hoodieKey = new HoodieKey(key, partition);
- HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new
RewriteAvroPayload(record));
- return hoodieRecord;
- });
+ if (recordType == HoodieRecordType.AVRO) {
+ JavaRDD<GenericRecord> genericRecordsRDD =
+ records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new
Properties()).get());
+
+ Dataset<Row> sourceDataset =
+ AvroConversionUtils.createDataFrame(
+ genericRecordsRDD.rdd(),
+ schema.toString(),
+ sparkEngineContext.getSqlContext().sparkSession()
+ );
+
+ Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
+
+ return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(),
schema.get().getNamespace(), false, Option.empty())
+ .toJavaRDD()
+ .map(record -> {
+ String key =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String partition =
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ HoodieKey hoodieKey = new HoodieKey(key, partition);
+ HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new
RewriteAvroPayload(record));
+ return hoodieRecord;
+ });
+ } else if (recordType == HoodieRecordType.SPARK) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.get());
+ Dataset<Row> sourceDataset =
SparkConversionUtils.createDataFrame(records.rdd(),
sparkEngineContext.getSqlContext().sparkSession(), structType);
+
+ Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
+
+ return sortedDataset.queryExecution().toRdd()
+ .toJavaRDD()
+ .map(row -> {
+ InternalRow internalRow = row.copy();
Review Comment:
@minihippo let's add a comment explaining why they copy is being made here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]