alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920463727
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##########
@@ -54,42 +63,64 @@
private final SerializableSchema schema;
private final HoodieClusteringConfig.LayoutOptimizationStrategy
layoutOptStrategy;
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType
curveCompositionStrategyType;
+ private final HoodieRecordType recordType;
public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext
sparkEngineContext,
- String[] orderByColumns,
-
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
-
HoodieClusteringConfig.SpatialCurveCompositionStrategyType
curveCompositionStrategyType,
- Schema schema) {
+ String[] orderByColumns,
+ LayoutOptimizationStrategy layoutOptStrategy,
+ SpatialCurveCompositionStrategyType curveCompositionStrategyType,
+ Schema schema, HoodieRecordType recordType) {
this.sparkEngineContext = sparkEngineContext;
this.orderByColumns = orderByColumns;
this.layoutOptStrategy = layoutOptStrategy;
this.curveCompositionStrategyType = curveCompositionStrategyType;
this.schema = new SerializableSchema(schema);
+ this.recordType = recordType;
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputSparkPartitions) {
- JavaRDD<GenericRecord> genericRecordsRDD =
- records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new
Properties()).get());
-
- Dataset<Row> sourceDataset =
- AvroConversionUtils.createDataFrame(
- genericRecordsRDD.rdd(),
- schema.toString(),
- sparkEngineContext.getSqlContext().sparkSession()
- );
-
- Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
-
- return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(),
schema.get().getNamespace(), false, Option.empty())
- .toJavaRDD()
- .map(record -> {
- String key =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String partition =
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
- HoodieKey hoodieKey = new HoodieKey(key, partition);
- HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new
RewriteAvroPayload(record));
- return hoodieRecord;
- });
+ if (recordType == HoodieRecordType.AVRO) {
+ JavaRDD<GenericRecord> genericRecordsRDD =
+ records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new
Properties()).get());
+
+ Dataset<Row> sourceDataset =
+ AvroConversionUtils.createDataFrame(
+ genericRecordsRDD.rdd(),
+ schema.toString(),
+ sparkEngineContext.getSqlContext().sparkSession()
+ );
+
+ Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
+
+ return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(),
schema.get().getNamespace(), false, Option.empty())
+ .toJavaRDD()
+ .map(record -> {
+ String key =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String partition =
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ HoodieKey hoodieKey = new HoodieKey(key, partition);
+ HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new
RewriteAvroPayload(record));
+ return hoodieRecord;
+ });
+ } else if (recordType == HoodieRecordType.SPARK) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.get());
+ Dataset<Row> sourceDataset =
SparkConversionUtils.createDataFrame(records.rdd(),
sparkEngineContext.getSqlContext().sparkSession(), structType);
+
+ Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
+
+ return sortedDataset.queryExecution().toRdd()
+ .toJavaRDD()
+ .map(row -> {
+ InternalRow internalRow = row.copy();
Review Comment:
@xushiyan in short: you can't hold the reference to `InternalRow` outside
the closure where you have access to it, since underlying Spark uses mutable
buffer to reduce # of allocations to a minimum (in other words you always get
the same IR object that gets reset every time iterator moves)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]