alexeykudinkin commented on a change in pull request #4234:
URL: https://github.com/apache/hudi/pull/4234#discussion_r778545196
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -82,6 +82,7 @@ public
SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
+ // TODO review, this pulls in all of the RDDs into driver's memory
Review comment:
Was leaving these TODOs for myself, accidentally committed, will clean up
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java
##########
@@ -205,48 +179,91 @@ public boolean hasNext() {
@Override
public Row next() {
Row row = rows.next();
- List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
- int index = entry.getKey();
- StructField field = entry.getValue();
- DataType dataType = field.dataType();
- if (dataType instanceof LongType) {
- return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
- } else if (dataType instanceof DoubleType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
Double.doubleToLongBits(row.getDouble(index));
- } else if (dataType instanceof IntegerType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
(long)row.getInt(index);
- } else if (dataType instanceof FloatType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
Double.doubleToLongBits((double) row.getFloat(index));
- } else if (dataType instanceof StringType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
BinaryUtil.convertStringToLong(row.getString(index));
- } else if (dataType instanceof DateType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
row.getDate(index).getTime();
- } else if (dataType instanceof TimestampType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
row.getTimestamp(index).getTime();
- } else if (dataType instanceof ByteType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
- } else if (dataType instanceof ShortType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
(long)row.getShort(index);
- } else if (dataType instanceof DecimalType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
row.getDecimal(index).longValue();
- } else if (dataType instanceof BooleanType) {
- boolean value = row.isNullAt(index) ? false :
row.getBoolean(index);
- return value ? Long.MAX_VALUE : 0;
- } else if (dataType instanceof BinaryType) {
- return row.isNullAt(index) ? Long.MAX_VALUE :
BinaryUtil.convertBytesToLong((byte[]) row.get(index));
- }
- return null;
- }).filter(f -> f != null).collect(Collectors.toList());
+ long[] longs = fieldMap.entrySet().stream()
+ .mapToLong(entry -> {
+ int index = entry.getKey();
+ StructField field = entry.getValue();
+ return mapColumnValueToLong(row, index, field.dataType());
+ })
+ .toArray();
- byte[] hilbertValue = HilbertCurveUtils.indexBytes(
- hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
- List<Object> values = new ArrayList<>();
-
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
- values.add(hilbertValue);
- return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
+ // Map N-dimensional coordinates into position on the Hilbert curve
+ byte[] hilbertCurvePosBytes =
HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
+ return appendToRow(row, hilbertCurvePosBytes);
}
};
- }).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true,
fileNum);
+ })
+ .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true,
fileNum);
+ }
+
+ private static Row appendToRow(Row row, Object value) {
+ // NOTE: This is an ugly hack to avoid array re-allocation --
+ // Spark's {@code Row#toSeq} returns array of Objects
+ Object[] currentValues = (Object[]) ((WrappedArray<Object>)
row.toSeq()).array();
+ return RowFactory.create(CollectionUtils.append(currentValues, value));
+ }
+
+ @Nonnull
+ private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType
dataType) {
Review comment:
Agreed. This actually should have been in the previous change (the one
this was stacked on top) let me see why it didn't land after rebase.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/hadoop/PathCachingFileName.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.net.URI;
+
+/**
+ * NOTE: This class is thread-safe
+ */
+public class PathCachingFileName extends Path {
+
+ // NOTE: volatile keyword is redundant here and put mostly for reader
notice, since all
Review comment:
But its goal is the opposite: volatile isn't required here and i added a
comment for those who might decide that it isn't needed, removing it (while the
purpose of volatile here is to hint to the reader that the read/write of the
ref doesn't need to be synchronized here)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]