vinothchandar commented on a change in pull request #4234:
URL: https://github.com/apache/hudi/pull/4234#discussion_r775959711



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java
##########
@@ -90,7 +90,7 @@ public static long getObjectSize(Object obj) throws 
UnsupportedOperationExceptio
   private final Map<Class<?>, ClassSizeInfo> classSizeInfos = new 
IdentityHashMap<>();
 
   private final Set<Object> alreadyVisited = Collections.newSetFromMap(new 
IdentityHashMap<>());
-  private final Deque<Object> pending = new ArrayDeque<>(16 * 1024);
+  private final Deque<Object> pending = new ArrayDeque<>(64);

Review comment:
       Can you explain this change?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/hadoop/PathCachingFileName.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.net.URI;
+
+/**
+ * NOTE: This class is thread-safe
+ */
+public class PathCachingFileName extends Path {
+
+  // NOTE: volatile keyword is redundant here and put mostly for reader 
notice, since all

Review comment:
       remove this comment? Don't want to overload the code with these. I'd 
expect people to understand `volatile`

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java
##########
@@ -205,48 +179,91 @@ public boolean hasNext() {
         @Override
         public Row next() {
           Row row = rows.next();
-          List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
-            int index = entry.getKey();
-            StructField field = entry.getValue();
-            DataType dataType = field.dataType();
-            if (dataType instanceof LongType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
-            } else if (dataType instanceof DoubleType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
Double.doubleToLongBits(row.getDouble(index));
-            } else if (dataType instanceof IntegerType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
(long)row.getInt(index);
-            } else if (dataType instanceof FloatType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
Double.doubleToLongBits((double) row.getFloat(index));
-            } else if (dataType instanceof StringType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
BinaryUtil.convertStringToLong(row.getString(index));
-            } else if (dataType instanceof DateType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getDate(index).getTime();
-            } else if (dataType instanceof TimestampType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getTimestamp(index).getTime();
-            } else if (dataType instanceof ByteType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
-            } else if (dataType instanceof ShortType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
(long)row.getShort(index);
-            } else if (dataType instanceof DecimalType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getDecimal(index).longValue();
-            } else if (dataType instanceof BooleanType) {
-              boolean value = row.isNullAt(index) ? false : 
row.getBoolean(index);
-              return value ? Long.MAX_VALUE : 0;
-            } else if (dataType instanceof BinaryType) {
-              return row.isNullAt(index) ? Long.MAX_VALUE : 
BinaryUtil.convertBytesToLong((byte[]) row.get(index));
-            }
-            return null;
-          }).filter(f -> f != null).collect(Collectors.toList());
+          long[] longs = fieldMap.entrySet().stream()
+              .mapToLong(entry -> {
+                int index = entry.getKey();
+                StructField field = entry.getValue();
+                return mapColumnValueToLong(row, index, field.dataType());
+              })
+              .toArray();
 
-          byte[] hilbertValue = HilbertCurveUtils.indexBytes(
-              hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
-          List<Object> values = new ArrayList<>();
-          
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
-          values.add(hilbertValue);
-          return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
+          // Map N-dimensional coordinates into position on the Hilbert curve
+          byte[] hilbertCurvePosBytes = 
HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
+          return appendToRow(row, hilbertCurvePosBytes);
         }
       };
-    }).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, 
fileNum);
+    })
+        .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, 
fileNum);
+  }
+
+  private static Row appendToRow(Row row, Object value) {
+    // NOTE: This is an ugly hack to avoid array re-allocation --
+    //       Spark's {@code Row#toSeq} returns array of Objects
+    Object[] currentValues = (Object[]) ((WrappedArray<Object>) 
row.toSeq()).array();
+    return RowFactory.create(CollectionUtils.append(currentValues, value));
+  }
+
+  @Nonnull
+  private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType 
dataType) {

Review comment:
       this is very hard to review. and pushes all the overhead to the 
reviewer.  Can you comment on the PR to call out what has actually changed?  
   
   > please separate refactoring with the actual implementation of functionality
   
   I'd also prefer if we did these restructuring/moving in a separate PR when 
possible. makes for an easier review. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -82,6 +82,7 @@ public 
SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
         .performClustering(clusteringPlan, schema, instantTime);
     JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
     JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
+    // TODO review, this pulls in all of the RDDs into driver's memory

Review comment:
       This only pulls the write stats, not the entire RDD. lets remove the 
comment. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
##########
@@ -62,16 +61,12 @@ public 
RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext spark
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records, int outputSparkPartitions) {
-    String payloadClass = config.getPayloadClass();
-    // do sort
     JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, 
outputSparkPartitions, serializableSchema.get());
     return preparedRecord.map(record -> {
       String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
       String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
       HoodieKey hoodieKey = new HoodieKey(key, partition);
-      HoodieRecordPayload avroPayload = 
ReflectionUtils.loadPayload(payloadClass,
-          new Object[] {Option.of(record)}, Option.class);
-      HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
+      HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new 
RewriteAvroPayload(record));

Review comment:
       this is okay, as long as the new base and log files have been merged 
already in `records`. I think this is the case. Could you confirm again? the 
case for using the user-defined payload class for clustering, is when the 
clustering also implicitly compacts the base and log files. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/hadoop/PathCachingFileName.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.net.URI;
+
+/**
+ * NOTE: This class is thread-safe
+ */
+public class PathCachingFileName extends Path {

Review comment:
       rename: FileNameCachingPath or NameCachedPath, ending with `Path`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to