alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920465935


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##########
@@ -54,42 +63,64 @@
   private final SerializableSchema schema;
   private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
   private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
+  private final HoodieRecordType recordType;
 
   public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext 
sparkEngineContext,
-                                        String[] orderByColumns,
-                                        
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
-                                        
HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType,
-                                        Schema schema) {
+      String[] orderByColumns,
+      LayoutOptimizationStrategy layoutOptStrategy,
+      SpatialCurveCompositionStrategyType curveCompositionStrategyType,
+      Schema schema, HoodieRecordType recordType) {
     this.sparkEngineContext = sparkEngineContext;
     this.orderByColumns = orderByColumns;
     this.layoutOptStrategy = layoutOptStrategy;
     this.curveCompositionStrategyType = curveCompositionStrategyType;
     this.schema = new SerializableSchema(schema);
+    this.recordType = recordType;
   }
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records, int outputSparkPartitions) {
-    JavaRDD<GenericRecord> genericRecordsRDD =
-        records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get());
-
-    Dataset<Row> sourceDataset =
-        AvroConversionUtils.createDataFrame(
-            genericRecordsRDD.rdd(),
-            schema.toString(),
-            sparkEngineContext.getSqlContext().sparkSession()
-        );
-
-    Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
-
-    return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
-        .toJavaRDD()
-        .map(record -> {
-          String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-          String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
-          HoodieKey hoodieKey = new HoodieKey(key, partition);
-          HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new 
RewriteAvroPayload(record));
-          return hoodieRecord;
-        });
+    if (recordType == HoodieRecordType.AVRO) {
+      JavaRDD<GenericRecord> genericRecordsRDD =
+          records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get());
+
+      Dataset<Row> sourceDataset =
+          AvroConversionUtils.createDataFrame(
+              genericRecordsRDD.rdd(),
+              schema.toString(),
+              sparkEngineContext.getSqlContext().sparkSession()
+          );
+
+      Dataset<Row> sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
+
+      return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
+          .toJavaRDD()
+          .map(record -> {
+            String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+            String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+            HoodieKey hoodieKey = new HoodieKey(key, partition);
+            HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new 
RewriteAvroPayload(record));
+            return hoodieRecord;
+          });
+    } else if (recordType == HoodieRecordType.SPARK) {
+      StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.get());
+      Dataset<Row> sourceDataset = 
SparkConversionUtils.createDataFrame(records.rdd(), 
sparkEngineContext.getSqlContext().sparkSession(), structType);
+
+      Dataset<Row> sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
+
+      return sortedDataset.queryExecution().toRdd()
+          .toJavaRDD()
+          .map(row -> {
+            InternalRow internalRow = row.copy();

Review Comment:
   @minihippo let's add a comment explaining why they copy is being made here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to