alexeykudinkin commented on a change in pull request #4234:
URL: https://github.com/apache/hudi/pull/4234#discussion_r779161580



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
##########
@@ -62,16 +61,12 @@ public 
RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext spark
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records, int outputSparkPartitions) {
-    String payloadClass = config.getPayloadClass();
-    // do sort
     JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, 
outputSparkPartitions, serializableSchema.get());
     return preparedRecord.map(record -> {
       String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
       String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
       HoodieKey hoodieKey = new HoodieKey(key, partition);
-      HoodieRecordPayload avroPayload = 
ReflectionUtils.loadPayload(payloadClass,
-          new Object[] {Option.of(record)}, Option.class);
-      HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
+      HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new 
RewriteAvroPayload(record));

Review comment:
       Correct. This is done by 
`MultipleSparkJobExecutionStrategy.readRecordsForGroup` method prior to calling 
into Partitioner

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java
##########
@@ -205,48 +179,91 @@ public boolean hasNext() {
         @Override
         public Row next() {
           Row row = rows.next();
-          List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
-            int index = entry.getKey();
-            StructField field = entry.getValue();
-            DataType dataType = field.dataType();
-            if (dataType instanceof LongType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
-            } else if (dataType instanceof DoubleType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
Double.doubleToLongBits(row.getDouble(index));
-            } else if (dataType instanceof IntegerType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
(long)row.getInt(index);
-            } else if (dataType instanceof FloatType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
Double.doubleToLongBits((double) row.getFloat(index));
-            } else if (dataType instanceof StringType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
BinaryUtil.convertStringToLong(row.getString(index));
-            } else if (dataType instanceof DateType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getDate(index).getTime();
-            } else if (dataType instanceof TimestampType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getTimestamp(index).getTime();
-            } else if (dataType instanceof ByteType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
-            } else if (dataType instanceof ShortType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
(long)row.getShort(index);
-            } else if (dataType instanceof DecimalType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getDecimal(index).longValue();
-            } else if (dataType instanceof BooleanType) {
-              boolean value = row.isNullAt(index) ? false : 
row.getBoolean(index);
-              return value ? Long.MAX_VALUE : 0;
-            } else if (dataType instanceof BinaryType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
BinaryUtil.convertBytesToLong((byte[]) row.get(index));
-            }
-            return null;
-          }).filter(f -> f != null).collect(Collectors.toList());
+          long[] longs = fieldMap.entrySet().stream()
+              .mapToLong(entry -> {
+                int index = entry.getKey();
+                StructField field = entry.getValue();
+                return mapColumnValueToLong(row, index, field.dataType());
+              })
+              .toArray();
 
-          byte[] hilbertValue = HilbertCurveUtils.indexBytes(
-              hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
-          List<Object> values = new ArrayList<>();
-          
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
-          values.add(hilbertValue);
-          return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
+          // Map N-dimensional coordinates into position on the Hilbert curve
+          byte[] hilbertCurvePosBytes = 
HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
+          return appendToRow(row, hilbertCurvePosBytes);
         }
       };
-    }).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, 
fileNum);
+    })
+        .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, 
fileNum);
+  }
+
+  private static Row appendToRow(Row row, Object value) {
+    // NOTE: This is an ugly hack to avoid array re-allocation --
+    //       Spark's {@code Row#toSeq} returns array of Objects
+    Object[] currentValues = (Object[]) ((WrappedArray<Object>) 
row.toSeq()).array();
+    return RowFactory.create(CollectionUtils.append(currentValues, value));
+  }
+
+  @Nonnull
+  private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType 
dataType) {

Review comment:
       Yeah, this commit was incorrectly placed into this branch instead of the 
one it was stacked on.
   
   Summary of the changes here:
   1. This large conditional was extracted to standalone method (no changes 
w/in conditional itself)
   2. In the surrounding context following things change
   2.a Added `appendToRow` method to avoid double conversion from Scala > Java 
> Scala
   2.b Use primitive Stream methods (`mapToLong`) to avoid unnecessary boxing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to