This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 780a2ac [HUDI-2102] Support hilbert curve for hudi (#3952)
780a2ac is described below
commit 780a2ac5b208b06f53ee64186b5392ebc37eadbc
Author: xiarixiaoyao <[email protected]>
AuthorDate: Sat Nov 27 15:20:19 2021 +0800
[HUDI-2102] Support hilbert curve for hudi (#3952)
Co-authored-by: Y Ethan Guo <[email protected]>
---
NOTICE | 6 +
hudi-client/hudi-client-common/pom.xml | 7 +
.../apache/hudi/config/HoodieClusteringConfig.java | 28 ++
.../apache/hudi/optimize/HilbertCurveUtils.java | 52 +++
.../org/apache/hudi/optimize/ZOrderingUtil.java | 7 +-
.../hudi/optimize/TestHilbertCurveUtils.java | 38 ++
.../apache/hudi/optimize/TestZOrderingUtil.java | 25 ++
...RDDSpatialCurveOptimizationSortPartitioner.java | 8 +-
.../hudi/index/zorder/ZOrderingIndexHelper.java | 17 +-
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 8 +-
.../java/org/apache/spark/OrderingIndexHelper.java | 430 +++++++++++++++++++++
.../spark/sql/hudi/execution/RangeSample.scala | 45 ++-
.../functional/TestTableLayoutOptimization.scala | 248 ++++++++++++
.../benchmark/SpaceCurveOptimizeBenchMark.scala | 118 ++++++
packaging/hudi-flink-bundle/pom.xml | 2 +
packaging/hudi-kafka-connect-bundle/pom.xml | 2 +
packaging/hudi-spark-bundle/pom.xml | 2 +
packaging/hudi-utilities-bundle/pom.xml | 2 +
18 files changed, 1015 insertions(+), 30 deletions(-)
diff --git a/NOTICE b/NOTICE
index 9b24933..437b974 100644
--- a/NOTICE
+++ b/NOTICE
@@ -159,3 +159,9 @@ its NOTICE file:
This product includes software developed at
StreamSets (http://www.streamsets.com/).
+--------------------------------------------------------------------------------
+
+This product includes code from hilbert-curve project
+ * Copyright https://github.com/davidmoten/hilbert-curve
+ * Licensed under the Apache-2.0 License
+
diff --git a/hudi-client/hudi-client-common/pom.xml
b/hudi-client/hudi-client-common/pom.xml
index d30ee29..22ad8ec 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -64,6 +64,13 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <!-- Hilbert Curve -->
+ <dependency>
+ <groupId>com.github.davidmoten</groupId>
+ <artifactId>hilbert-curve</artifactId>
+ <version>0.2.2</version>
+ </dependency>
+
<!-- Dropwizard Metrics -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 9a10965..676f2ff 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -542,4 +542,32 @@ public class HoodieClusteringConfig extends HoodieConfig {
}
}
}
+
+ /**
+ * strategy types for optimize layout for hudi data.
+ */
+ public enum BuildLayoutOptimizationStrategy {
+ ZORDER("z-order"),
+ HILBERT("hilbert");
+ private final String value;
+
+ BuildLayoutOptimizationStrategy(String value) {
+ this.value = value;
+ }
+
+ public String toCustomString() {
+ return value;
+ }
+
+ public static BuildLayoutOptimizationStrategy fromValue(String value) {
+ switch (value.toLowerCase(Locale.ROOT)) {
+ case "z-order":
+ return ZORDER;
+ case "hilbert":
+ return HILBERT;
+ default:
+ throw new HoodieException("Invalid value of Type.");
+ }
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
new file mode 100644
index 0000000..0f216ab
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.math.BigInteger;
+
+/**
+ * Utils for Hilbert Curve.
+ */
+public class HilbertCurveUtils {
+ public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points,
int paddingNum) {
+ BigInteger index = hilbertCurve.index(points);
+ return paddingToNByte(index.toByteArray(), paddingNum);
+ }
+
+ public static byte[] paddingToNByte(byte[] a, int paddingNum) {
+ if (a.length == paddingNum) {
+ return a;
+ }
+ if (a.length > paddingNum) {
+ byte[] result = new byte[paddingNum];
+ System.arraycopy(a, 0, result, 0, paddingNum);
+ return result;
+ }
+ int paddingSize = paddingNum - a.length;
+ byte[] result = new byte[paddingNum];
+ for (int i = 0; i < paddingSize; i++) {
+ result[i] = 0;
+ }
+ System.arraycopy(a, 0, result, paddingSize, a.length);
+ return result;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
index 3aa8080..50827cc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
@@ -176,9 +176,14 @@ public class ZOrderingUtil {
public static Long convertStringToLong(String a) {
byte[] bytes = utf8To8Byte(a);
+ return convertBytesToLong(bytes);
+ }
+
+ public static long convertBytesToLong(byte[] bytes) {
+ byte[] paddedBytes = paddingTo8Byte(bytes);
long temp = 0L;
for (int i = 7; i >= 0; i--) {
- temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8);
+ temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8);
}
return temp;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
new file mode 100644
index 0000000..5bb482e
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.davidmoten.hilbert.HilbertCurve;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHilbertCurveUtils {
+
+ private static final HilbertCurve INSTANCE =
HilbertCurve.bits(5).dimensions(2);
+
+ @Test
+ public void testIndex() {
+ long[] t = {1, 2};
+ assertEquals(13, INSTANCE.index(t).intValue());
+ long[] t1 = {0, 16};
+ assertEquals(256, INSTANCE.index(t1).intValue());
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
index 7dab6c2..a22485f 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
@@ -126,4 +126,29 @@ public class TestZOrderingUtil {
this.originValue = originValue;
}
}
+
+ @Test
+ public void testConvertBytesToLong() {
+ long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE};
+ for (int i = 0; i < tests.length; i++) {
+
assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])),
tests[i]);
+ }
+ }
+
+ @Test
+ public void testConvertBytesToLongWithPadding() {
+ byte[] bytes = new byte[2];
+ bytes[0] = 2;
+ bytes[1] = 127;
+ assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127);
+ }
+
+ private byte[] convertLongToBytes(long num) {
+ byte[] byteNum = new byte[8];
+ for (int i = 0; i < 8; i++) {
+ int offset = 64 - (i + 1) * 8;
+ byteNum[i] = (byte) ((num >> offset) & 0xff);
+ }
+ return byteNum;
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
index 03fdf5a..51526fc 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
@@ -33,7 +33,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
+import org.apache.spark.OrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -79,10 +79,12 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T
extends HoodieRecordPa
switch (config.getLayoutOptimizationCurveBuildMethod()) {
case DIRECT:
- zDataFrame =
ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF,
config.getClusteringSortColumns(), numOutputGroups);
+ zDataFrame = OrderingIndexHelper
+ .createOptimizedDataFrameByMapValue(originDF,
config.getClusteringSortColumns(), numOutputGroups,
config.getLayoutOptimizationStrategy());
break;
case SAMPLE:
- zDataFrame =
ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF,
config.getClusteringSortColumns(), numOutputGroups);
+ zDataFrame = OrderingIndexHelper
+ .createOptimizeDataFrameBySample(originDF,
config.getClusteringSortColumns(), numOutputGroups,
config.getLayoutOptimizationStrategy());
break;
default:
throw new HoodieException("Not a valid build curve method for
doWriteOperation: ");
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
index 248c15c..934d1b9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
@@ -18,17 +18,19 @@
package org.apache.hudi.index.zorder;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.io.api.Binary;
@@ -62,10 +64,10 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
-import scala.collection.JavaConversions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -77,6 +79,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import scala.collection.JavaConversions;
+
import static org.apache.hudi.util.DataTypeUtils.areCompatible;
public class ZOrderingIndexHelper {
@@ -189,7 +193,8 @@ public class ZOrderingIndexHelper {
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df,
List<String> zCols, int fileNum) {
- return RangeSampleSort$.MODULE$.sortDataFrameBySample(df,
JavaConversions.asScalaBuffer(zCols), fileNum);
+ return RangeSampleSort$.MODULE$.sortDataFrameBySample(df,
JavaConversions.asScalaBuffer(zCols), fileNum,
+
HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString());
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df,
String zCols, int fileNum) {
@@ -584,7 +589,7 @@ public class ZOrderingIndexHelper {
* @VisibleForTesting
*/
@Nonnull
- static String createIndexMergeSql(
+ public static String createIndexMergeSql(
@Nonnull String originalIndexTable,
@Nonnull String newIndexTable,
@Nonnull List<String> columns
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 280d24f..7d2fbd3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -18,8 +18,6 @@
package org.apache.hudi.table;
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -49,6 +47,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
@@ -76,12 +75,15 @@ import
org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
new file mode 100644
index 0000000..67b1c67
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
+import org.apache.hudi.optimize.HilbertCurveUtils;
+import org.apache.hudi.optimize.ZOrderingUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.api.Binary;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Row$;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
+import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+
+public class OrderingIndexHelper {
+
+ private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
+
+ /**
+ * Create optimized DataFrame directly
+ * only support base type data.
long,int,short,double,float,string,timestamp,decimal,date,byte
+ * this method is more effective than createOptimizeDataFrameBySample
+ *
+ * @param df a spark DataFrame holds parquet files to be read.
+ * @param sortCols ordering columns for the curve
+ * @param fileNum spark partition num
+ * @param sortMode layout optimization strategy
+ * @return a dataFrame ordered by the curve.
+ */
+ public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row>
df, List<String> sortCols, int fileNum, String sortMode) {
+ Map<String, StructField> columnsMap =
Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e
-> e));
+ int fieldNum = df.schema().fields().length;
+ List<String> checkCols = sortCols.stream().filter(f ->
columnsMap.containsKey(f)).collect(Collectors.toList());
+ if (sortCols.size() != checkCols.size()) {
+ return df;
+ }
+ // only one col to sort, no need to use z-order
+ if (sortCols.size() == 1) {
+ return df.repartitionByRange(fieldNum,
org.apache.spark.sql.functions.col(sortCols.get(0)));
+ }
+ Map<Integer, StructField> fieldMap = sortCols
+ .stream().collect(Collectors.toMap(e ->
Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e ->
columnsMap.get(e)));
+ // do optimize
+ JavaRDD<Row> sortedRDD = null;
+ switch
(HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
+ case ZORDER:
+ sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum,
fileNum);
+ break;
+ case HILBERT:
+ sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum,
fileNum);
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("new only support
z-order/hilbert optimize but find: %s", sortMode));
+ }
+ // create new StructType
+ List<StructField> newFields = new ArrayList<>();
+ newFields.addAll(Arrays.asList(df.schema().fields()));
+ newFields.add(new StructField("Index", BinaryType$.MODULE$, true,
Metadata.empty()));
+
+ // create new DataFrame
+ return df.sparkSession().createDataFrame(sortedRDD,
StructType$.MODULE$.apply(newFields)).drop("Index");
+ }
+
+ private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD,
Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
+ return originRDD.map(row -> {
+ List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
+ int index = entry.getKey();
+ StructField field = entry.getValue();
+ DataType dataType = field.dataType();
+ if (dataType instanceof LongType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ?
Long.MAX_VALUE : row.getLong(index));
+ } else if (dataType instanceof DoubleType) {
+ return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ?
Double.MAX_VALUE : row.getDouble(index));
+ } else if (dataType instanceof IntegerType) {
+ return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ?
Integer.MAX_VALUE : row.getInt(index));
+ } else if (dataType instanceof FloatType) {
+ return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ?
Float.MAX_VALUE : row.getFloat(index));
+ } else if (dataType instanceof StringType) {
+ return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" :
row.getString(index));
+ } else if (dataType instanceof DateType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ?
Long.MAX_VALUE : row.getDate(index).getTime());
+ } else if (dataType instanceof TimestampType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ?
Long.MAX_VALUE : row.getTimestamp(index).getTime());
+ } else if (dataType instanceof ByteType) {
+ return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ?
Byte.MAX_VALUE : row.getByte(index));
+ } else if (dataType instanceof ShortType) {
+ return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ?
Short.MAX_VALUE : row.getShort(index));
+ } else if (dataType instanceof DecimalType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ?
Long.MAX_VALUE : row.getDecimal(index).longValue());
+ } else if (dataType instanceof BooleanType) {
+ boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
+ return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
+ } else if (dataType instanceof BinaryType) {
+ return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[]
{0} : (byte[]) row.get(index));
+ }
+ return null;
+ }).filter(f -> f != null).collect(Collectors.toList());
+ byte[][] zBytes = new byte[zBytesList.size()][];
+ for (int i = 0; i < zBytesList.size(); i++) {
+ zBytes[i] = zBytesList.get(i);
+ }
+ List<Object> zVaules = new ArrayList<>();
+
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
+ zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
+ return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
+ }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true,
fileNum);
+ }
+
+ private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD,
Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
+ return originRDD.mapPartitions(rows -> {
+ HilbertCurve hilbertCurve =
HilbertCurve.bits(63).dimensions(fieldMap.size());
+ return new Iterator<Row>() {
+
+ @Override
+ public boolean hasNext() {
+ return rows.hasNext();
+ }
+
+ @Override
+ public Row next() {
+ Row row = rows.next();
+ List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
+ int index = entry.getKey();
+ StructField field = entry.getValue();
+ DataType dataType = field.dataType();
+ if (dataType instanceof LongType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
+ } else if (dataType instanceof DoubleType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
Double.doubleToLongBits(row.getDouble(index));
+ } else if (dataType instanceof IntegerType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
(long)row.getInt(index);
+ } else if (dataType instanceof FloatType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
Double.doubleToLongBits((double) row.getFloat(index));
+ } else if (dataType instanceof StringType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
ZOrderingUtil.convertStringToLong(row.getString(index));
+ } else if (dataType instanceof DateType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
row.getDate(index).getTime();
+ } else if (dataType instanceof TimestampType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
row.getTimestamp(index).getTime();
+ } else if (dataType instanceof ByteType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)});
+ } else if (dataType instanceof ShortType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
(long)row.getShort(index);
+ } else if (dataType instanceof DecimalType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
row.getDecimal(index).longValue();
+ } else if (dataType instanceof BooleanType) {
+ boolean value = row.isNullAt(index) ? false :
row.getBoolean(index);
+ return value ? Long.MAX_VALUE : 0;
+ } else if (dataType instanceof BinaryType) {
+ return row.isNullAt(index) ? Long.MAX_VALUE :
ZOrderingUtil.convertBytesToLong((byte[]) row.get(index));
+ }
+ return null;
+ }).filter(f -> f != null).collect(Collectors.toList());
+
+ byte[] hilbertValue = HilbertCurveUtils.indexBytes(
+ hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
+ List<Object> values = new ArrayList<>();
+
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
+ values.add(hilbertValue);
+ return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
+ }
+ };
+ }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true,
fileNum);
+ }
+
+ public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row>
df, String sortCols, int fileNum, String sortMode) {
+ if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) {
+ return df;
+ }
+ return createOptimizedDataFrameByMapValue(df,
+ Arrays.stream(sortCols.split(",")).map(f ->
f.trim()).collect(Collectors.toList()), fileNum, sortMode);
+ }
+
+ public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df,
List<String> zCols, int fileNum, String sortMode) {
+ return RangeSampleSort$.MODULE$.sortDataFrameBySample(df,
JavaConversions.asScalaBuffer(zCols), fileNum, sortMode);
+ }
+
+ public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df,
String zCols, int fileNum, String sortMode) {
+ if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
+ return df;
+ }
+ return createOptimizeDataFrameBySample(df,
Arrays.stream(zCols.split(",")).map(f ->
f.trim()).collect(Collectors.toList()), fileNum, sortMode);
+ }
+
+ /**
+ * Parse min/max statistics stored in parquet footers for z-sort cols.
+ * no support collect statistics from timeStampType, since parquet file has
not collect the statistics for timeStampType.
+ * to do adapt for rfc-27
+ *
+ * @param df a spark DataFrame holds parquet files to be read.
+ * @param cols z-sort cols
+ * @return a dataFrame holds all statistics info.
+ */
+ public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String>
cols) {
+ Map<String, DataType> columnsMap =
Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e
-> e.dataType()));
+
+ List<String> scanFiles = Arrays.asList(df.inputFiles());
+ SparkContext sc = df.sparkSession().sparkContext();
+ JavaSparkContext jsc = new JavaSparkContext(sc);
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(sc.hadoopConfiguration());
+ int numParallelism = (scanFiles.size() / 3 + 1);
+ List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
+ String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
+ try {
+ jsc.setJobDescription("Listing parquet column statistics");
+ colMinMaxInfos = jsc.parallelize(scanFiles,
numParallelism).mapPartitions(paths -> {
+ Configuration conf = serializableConfiguration.value();
+ ParquetUtils parquetUtils = (ParquetUtils)
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+ List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new
ArrayList<>();
+ while (paths.hasNext()) {
+ String path = paths.next();
+ results.add(parquetUtils.readRangeFromParquetMetadata(conf, new
Path(path), cols));
+ }
+ return results.stream().flatMap(f -> f.stream()).iterator();
+ }).collect();
+ } finally {
+ jsc.setJobDescription(previousJobDescription);
+ }
+
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e ->
e.getFilePath()));
+ JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new
ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
+ int colSize = f.size();
+ if (colSize == 0) {
+ return null;
+ } else {
+ List<Object> rows = new ArrayList<>();
+ rows.add(f.get(0).getFilePath());
+ cols.stream().forEach(col -> {
+ HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
+ f.stream().filter(s ->
s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
+ DataType colType = columnsMap.get(col);
+ if (currentColRangeMetaData == null || colType == null) {
+ throw new HoodieException(String.format("cannot collect min/max
statistics for col: %s", col));
+ }
+ if (colType instanceof IntegerType) {
+ rows.add(currentColRangeMetaData.getMinValue());
+ rows.add(currentColRangeMetaData.getMaxValue());
+ } else if (colType instanceof DoubleType) {
+ rows.add(currentColRangeMetaData.getMinValue());
+ rows.add(currentColRangeMetaData.getMaxValue());
+ } else if (colType instanceof StringType) {
+ rows.add(currentColRangeMetaData.getMinValue().toString());
+ rows.add(currentColRangeMetaData.getMaxValue().toString());
+ } else if (colType instanceof DecimalType) {
+ rows.add(new
BigDecimal(currentColRangeMetaData.getMinValue().toString()));
+ rows.add(new
BigDecimal(currentColRangeMetaData.getMaxValue().toString()));
+ } else if (colType instanceof DateType) {
+
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString()));
+
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString()));
+ } else if (colType instanceof LongType) {
+ rows.add(currentColRangeMetaData.getMinValue());
+ rows.add(currentColRangeMetaData.getMaxValue());
+ } else if (colType instanceof ShortType) {
+
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
+
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
+ } else if (colType instanceof FloatType) {
+ rows.add(currentColRangeMetaData.getMinValue());
+ rows.add(currentColRangeMetaData.getMaxValue());
+ } else if (colType instanceof BinaryType) {
+
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
+
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
+ } else if (colType instanceof BooleanType) {
+ rows.add(currentColRangeMetaData.getMinValue());
+ rows.add(currentColRangeMetaData.getMaxValue());
+ } else if (colType instanceof ByteType) {
+
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
+
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
+ } else {
+ throw new HoodieException(String.format("Not support type: %s",
colType));
+ }
+ rows.add(currentColRangeMetaData.getNumNulls());
+ });
+ return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
+ }
+ }).filter(f -> f != null);
+ List<StructField> allMetaDataSchema = new ArrayList<>();
+ allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true,
Metadata.empty()));
+ cols.forEach(col -> {
+ allMetaDataSchema.add(new StructField(col + "_minValue",
columnsMap.get(col), true, Metadata.empty()));
+ allMetaDataSchema.add(new StructField(col + "_maxValue",
columnsMap.get(col), true, Metadata.empty()));
+ allMetaDataSchema.add(new StructField(col + "_num_nulls",
LongType$.MODULE$, true, Metadata.empty()));
+ });
+ return df.sparkSession().createDataFrame(allMetaDataRDD,
StructType$.MODULE$.apply(allMetaDataSchema));
+ }
+
+ public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
+ List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f ->
f.trim()).collect(Collectors.toList());
+ return getMinMaxValue(df, rawCols);
+ }
+
+ /**
+ * Update statistics info.
+ * this method will update old index table by full out join,
+ * and save the updated table into a new index table based on commitTime.
+ * old index table will be cleaned also.
+ *
+ * @param df a spark DataFrame holds parquet files to be read.
+ * @param cols z-sort cols.
+ * @param indexPath index store path.
+ * @param commitTime current operation commitTime.
+ * @param validateCommits all validate commits for current table.
+ * @return
+ */
+ public static void saveStatisticsInfo(Dataset<Row> df, String cols, String
indexPath, String commitTime, List<String> validateCommits) {
+ Path savePath = new Path(indexPath, commitTime);
+ SparkSession spark = df.sparkSession();
+ FileSystem fs = FSUtils.getFs(indexPath,
spark.sparkContext().hadoopConfiguration());
+ Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols);
+ // try to find last validate index table from index path
+ try {
+ // If there's currently no index, create one
+ if (!fs.exists(new Path(indexPath))) {
+
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+ return;
+ }
+
+ // Otherwise, clean up all indexes but the most recent one
+
+ List<String> allIndexTables = Arrays
+ .stream(fs.listStatus(new Path(indexPath))).filter(f ->
f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
+ List<String> candidateIndexTables = allIndexTables.stream().filter(f ->
validateCommits.contains(f)).sorted().collect(Collectors.toList());
+ List<String> residualTables = allIndexTables.stream().filter(f ->
!validateCommits.contains(f)).collect(Collectors.toList());
+ Option<Dataset> latestIndexData = Option.empty();
+ if (!candidateIndexTables.isEmpty()) {
+ latestIndexData = Option.of(spark.read().load(new Path(indexPath,
candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
+ // clean old index table, keep at most 1 index table.
+ candidateIndexTables.remove(candidateIndexTables.size() - 1);
+ candidateIndexTables.forEach(f -> {
+ try {
+ fs.delete(new Path(indexPath, f));
+ } catch (IOException ie) {
+ throw new HoodieException(ie);
+ }
+ });
+ }
+
+ // clean residualTables
+ // retried cluster operations at the same instant time is also
considered,
+ // the residual files produced by retried are cleaned up before save
statistics
+ // save statistics info to index table which named commitTime
+ residualTables.forEach(f -> {
+ try {
+ fs.delete(new Path(indexPath, f));
+ } catch (IOException ie) {
+ throw new HoodieException(ie);
+ }
+ });
+
+ if (latestIndexData.isPresent() &&
latestIndexData.get().schema().equals(statisticsDF.schema())) {
+ // update the statistics info
+ String originalTable = "indexTable_" +
java.util.UUID.randomUUID().toString().replace("-", "");
+ String updateTable = "updateTable_" +
java.util.UUID.randomUUID().toString().replace("-", "");
+ latestIndexData.get().registerTempTable(originalTable);
+ statisticsDF.registerTempTable(updateTable);
+ // update table by full out join
+ List columns = Arrays.asList(statisticsDF.schema().fieldNames());
+ spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable,
updateTable, columns)).repartition(1).write().save(savePath.toString());
+ } else {
+
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+ }
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
index c392f12..a168e55 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
@@ -19,15 +19,16 @@
package org.apache.spark.sql.hudi.execution
import org.apache.hudi.config.HoodieClusteringConfig
+import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil}
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
-import org.apache.hudi.optimize.ZOrderingUtil
-import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.MutablePair
import org.apache.spark.util.random.SamplingUtils
+import org.davidmoten.hilbert.HilbertCurve
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -35,10 +36,10 @@ import scala.reflect.{ClassTag, classTag}
import scala.util.hashing.byteswap32
class RangeSample[K: ClassTag, V](
- zEncodeNum: Int,
- rdd: RDD[_ <: Product2[K, V]],
- private var ascend: Boolean = true,
- val samplePointsPerPartitionHint: Int = 20) extends Serializable {
+ zEncodeNum: Int,
+ rdd: RDD[_ <: Product2[K, V]],
+ private var ascend: Boolean = true,
+ val samplePointsPerPartitionHint: Int = 20)
extends Serializable {
// We allow zEncodeNum = 0, which happens when sorting an empty RDD under
the default settings.
require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found
$zEncodeNum.")
@@ -335,16 +336,21 @@ object RangeSampleSort {
}
/**
- * create z-order DataFrame by sample
- * first, sample origin data to get z-cols bounds, then create z-order
DataFrame
+ * create optimize DataFrame by sample
+ * first, sample origin data to get order-cols bounds, then apply sort to
produce DataFrame
* support all type data.
- * this method need more resource and cost more time than
createZIndexedDataFrameByMapValue
+ * this method need more resource and cost more time than
createOptimizedDataFrameByMapValue
*/
- def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int):
DataFrame = {
+ def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int,
sortMode: String): DataFrame = {
val spark = df.sparkSession
val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap
val fieldNum = df.schema.fields.length
val checkCols = zCols.filter(col => columnsMap(col) != null)
+ val useHilbert = sortMode match {
+ case "hilbert" => true
+ case "z-order" => false
+ case other => throw new IllegalArgumentException(s"new only support
z-order/hilbert optimize but find: ${other}")
+ }
if (zCols.isEmpty || checkCols.isEmpty) {
df
@@ -366,7 +372,7 @@ object RangeSampleSort {
}.filter(_._1 != -1)
// Complex type found, use createZIndexedDataFrameByRange
if (zFields.length != zCols.length) {
- return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum)
+ return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum)
}
val rawRdd = df.rdd
@@ -441,6 +447,7 @@ object RangeSampleSort {
val boundBroadCast =
spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
val indexRdd = rawRdd.mapPartitions { iter =>
+ val hilbertCurve = if (useHilbert)
Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None
val expandBoundsWithFactor = boundBroadCast.value
val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max
val longDecisionBound = new RawDecisionBound(Ordering[Long])
@@ -507,17 +514,21 @@ object RangeSampleSort {
case _ =>
-1
}
- }.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray
- val zValues = ZOrderingUtil.interleaving(values, 8)
- Row.fromSeq(row.toSeq ++ Seq(zValues))
+ }.filter(v => v != -1)
+ val mapValues = if (hilbertCurve.isDefined) {
+ HilbertCurveUtils.indexBytes(hilbertCurve.get,
values.map(_.toLong).toArray, 32)
+ } else {
+
ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8)
+ }
+ Row.fromSeq(row.toSeq ++ Seq(mapValues))
}
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)),
numPartitions = fileNum)
val newDF = df.sparkSession.createDataFrame(indexRdd, StructType(
df.schema.fields ++ Seq(
- StructField(s"zindex",
+ StructField(s"index",
BinaryType, false))
))
- newDF.drop("zindex")
+ newDF.drop("index")
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
new file mode 100644
index 0000000..4b7864f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.spark.OrderingIndexHelper
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.Arguments.arguments
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.sql.{Date, Timestamp}
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+@Tag("functional")
+class TestTableLayoutOptimization extends HoodieClientTestBase {
+ var spark: SparkSession = _
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ )
+
+ @BeforeEach override def setUp() {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testLayOutParameter"))
+ def testOptimizewithClustering(tableType: String, optimizeMode: String):
Unit = {
+ val targetRecordsCount = 10000
+ // Bulk Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("001",
targetRecordsCount)).toList
+ val writeDf: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ writeDf.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false")
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
+ // option for clustering
+ .option("hoodie.parquet.small.file.limit", "0")
+ .option("hoodie.clustering.inline", "true")
+ .option("hoodie.clustering.inline.max.commits", "1")
+ .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
"1073741824")
+ .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
+ .option("hoodie.clustering.plan.strategy.max.bytes.per.group",
Long.MaxValue.toString)
+ .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(64 * 1024 * 1024L))
+ .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
+ .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(),
optimizeMode)
+ .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key,
"begin_lat, begin_lon")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val readDf =
+ spark.read
+ .format("hudi")
+ .load(basePath)
+
+ val readDfSkip =
+ spark.read
+ .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
+ .format("hudi")
+ .load(basePath)
+
+ assertEquals(targetRecordsCount, readDf.count())
+ assertEquals(targetRecordsCount, readDfSkip.count())
+
+ readDf.createOrReplaceTempView("hudi_snapshot_raw")
+ readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
+
+ def select(tableName: String) =
+ spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND
begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
+
+ assertRowsMatch(
+ select("hudi_snapshot_raw"),
+ select("hudi_snapshot_skipping")
+ )
+ }
+
+ def assertRowsMatch(one: DataFrame, other: DataFrame) = {
+ val rows = one.count()
+ assert(rows == other.count() && one.intersect(other).count() == rows)
+ }
+
+ @Test
+ def testCollectMinMaxStatistics(): Unit = {
+ val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
+ val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
+ val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ val complexDataFrame = createComplexDataFrame(spark)
+
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
+ val df = spark.read.load(testPath.toString)
+ try {
+ // test z-order/hilbert sort for all primitive type
+ // shoud not throw exception.
+ OrderingIndexHelper.createOptimizedDataFrameByMapValue(df,
"c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1)
+ OrderingIndexHelper.createOptimizedDataFrameByMapValue(df,
"c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1)
+ OrderingIndexHelper.createOptimizeDataFrameBySample(df,
"c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1)
+ OrderingIndexHelper.createOptimizeDataFrameBySample(df,
"c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1)
+ try {
+ // do not support TimeStampType, so if we collect statistics for c4,
should throw exception
+ val colDf = OrderingIndexHelper.getMinMaxValue(df,
"c1,c2,c3,c5,c6,c7,c8")
+ colDf.cache()
+ assertEquals(colDf.count(), 3)
+ assertEquals(colDf.take(1)(0).length, 22)
+ colDf.unpersist()
+ // try to save statistics
+ OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8",
statisticPath.toString, "2", Seq("0", "1"))
+ // save again
+ OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8",
statisticPath.toString, "3", Seq("0", "1", "2"))
+ // test old index table clean
+ OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8",
statisticPath.toString, "4", Seq("0", "1", "3"))
+ assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
+ assertEquals(fs.exists(new Path(statisticPath, "3")), true)
+ // test to save different index, new index on ("c1,c6,c7,c8") should
be successfully saved.
+ OrderingIndexHelper.saveStatisticsInfo(df, "c1,c6,c7,c8",
statisticPath.toString, "5", Seq("0", "1", "3", "4"))
+ assertEquals(fs.exists(new Path(statisticPath, "5")), true)
+ } finally {
+ if (fs.exists(testPath)) fs.delete(testPath)
+ if (fs.exists(statisticPath)) fs.delete(statisticPath)
+ }
+ }
+ }
+
+ // test collect min-max statistic info for DateType in the case of
multithreading.
+ // parquet will give a wrong statistic result for DateType in the case of
multithreading.
+ @Test
+ def testMultiThreadParquetFooterReadForDateType(): Unit = {
+ // create parquet file with DateType
+ val rdd = spark.sparkContext.parallelize(0 to 100, 1)
+ .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 +
1}-${item % 28 + 1}")))
+ val df = spark.createDataFrame(rdd, new StructType().add("id", DateType))
+ val testPath = new Path(System.getProperty("java.io.tmpdir"),
"testCollectDateType")
+ val conf = spark.sparkContext.hadoopConfiguration
+ val cols = new java.util.ArrayList[String]
+ cols.add("id")
+ try {
+ df.repartition(3).write.mode("overwrite").save(testPath.toString)
+ val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x
=> x)
+
+ val realResult = new Array[(String, String)](3)
+ inputFiles.zipWithIndex.foreach { case (f, index) =>
+ val fileUtils =
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
+ val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f),
cols).iterator().next()
+ realResult(index) = (res.getMinValue.toString,
res.getMaxValue.toString)
+ }
+
+ // multi thread read with no lock
+ val resUseLock = new Array[(String, String)](3)
+ inputFiles.zipWithIndex.par.foreach { case (f, index) =>
+ val fileUtils =
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
+ val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f),
cols).iterator().next()
+ resUseLock(index) = (res.getMinValue.toString,
res.getMaxValue.toString)
+ }
+
+ // check resUseNoLock,
+ // We can't guarantee that there must be problems in the case of
multithreading.
+ // In order to make ut pass smoothly, we will not check resUseNoLock.
+ // check resUseLock
+ // should pass assert
+ realResult.zip(resUseLock).foreach { case (realValue, testValue) =>
+ assert(realValue == testValue, s" expect realValue: ${realValue} but
find ${testValue}")
+ }
+ } finally {
+ if (fs.exists(testPath)) fs.delete(testPath)
+ }
+ }
+
+ def createComplexDataFrame(spark: SparkSession): DataFrame = {
+ val schema = new StructType()
+ .add("c1", IntegerType)
+ .add("c2", StringType)
+ .add("c3", DecimalType(9,3))
+ .add("c4", TimestampType)
+ .add("c5", ShortType)
+ .add("c6", DateType)
+ .add("c7", BinaryType)
+ .add("c8", ByteType)
+
+ val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
+ val c1 = Integer.valueOf(item)
+ val c2 = s" ${item}sdc"
+ val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
+ val c4 = new Timestamp(System.currentTimeMillis())
+ val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
+ val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
+ val c7 = Array(item).map(_.toByte)
+ val c8 = java.lang.Byte.valueOf("9")
+
+ RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
+ }
+ spark.createDataFrame(rdd, schema)
+ }
+}
+
+object TestTableLayoutOptimization {
+ def testLayOutParameter(): java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ arguments("COPY_ON_WRITE", "hilbert"),
+ arguments("COPY_ON_WRITE", "z-order"),
+ arguments("MERGE_ON_READ", "hilbert"),
+ arguments("MERGE_ON_READ", "z-order")
+ )
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala
new file mode 100644
index 0000000..c8263b3
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.OrderingIndexHelper
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.hudi.TestHoodieSqlBase
+
+import scala.util.Random
+
+object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase {
+
+ def getSkippingPercent(tableName: String, co1: String, co2: String, value1:
Int, value2: Int): Unit= {
+ val minMax = OrderingIndexHelper
+ .getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1},
${co2}")
+ .collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5)))
+ var c = 0
+ for (elem <- minMax) {
+ if ((elem._1 <= value1 && elem._2 >= value1) || (elem._3 <= value2 &&
elem._4 >= value2)) {
+ c = c + 1
+ }
+ }
+
+ val p = c / minMax.size.toDouble
+ println(s"for table ${tableName} with query filter: ${co1} = ${value1} or
${co2} = ${value2} we can achieve skipping percent ${1.0 - p}")
+ }
+
+ /*
+ for table table_z_sort_byMap with query filter: c1_int = 500000 or c2_int =
500000 we can achieve skipping percent 0.8
+ for table table_z_sort_bySample with query filter: c1_int = 500000 or c2_int
= 500000 we can achieve skipping percent 0.77
+ for table table_hilbert_sort_byMap with query filter: c1_int = 500000 or
c2_int = 500000 we can achieve skipping percent 0.855
+ for table table_hilbert_sort_bySample with query filter: c1_int = 500000 or
c2_int = 500000 we can achieve skipping percent 0.83
+ */
+ def runNormalTableSkippingBenchMark(): Unit = {
+ withTempDir { f =>
+ withTempTable("table_z_sort_byMap", "table_z_sort_bySample",
"table_hilbert_sort_byMap", "table_hilbert_sort_bySample") {
+ prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000)
+ // choose median value as filter condition.
+ // the median value of c1_int is 500000
+ // the median value of c2_int is 500000
+ getSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000,
500000)
+ getSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int",
500000, 500000)
+ getSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int",
500000, 500000)
+ getSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int",
500000, 500000)
+ }
+ }
+ }
+
+ /*
+ for table table_z_sort_byMap_skew with query filter: c1_int = 5000 or c2_int
= 500000 we can achieve skipping percent 0.0
+ for table table_z_sort_bySample_skew with query filter: c1_int = 5000 or
c2_int = 500000 we can achieve skipping percent 0.78
+ for table table_hilbert_sort_byMap_skew with query filter: c1_int = 5000 or
c2_int = 500000 we can achieve skipping percent 0.05500000000000005
+ for table table_hilbert_sort_bySample_skew with query filter: c1_int = 5000
or c2_int = 500000 we can achieve skipping percent 0.84
+ */
+ def runSkewTableSkippingBenchMark(): Unit = {
+ withTempDir { f =>
+ withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew",
"table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") {
+ // prepare skewed table.
+ prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000, 10000,
1000000, true)
+ // choose median value as filter condition.
+ // the median value of c1_int is 5000
+ // the median value of c2_int is 500000
+ getSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int",
5000, 500000)
+ getSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int",
5000, 500000)
+ getSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int",
"c2_int", 5000, 500000)
+ getSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int",
"c2_int", 5000, 500000)
+ }
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ runNormalTableSkippingBenchMark()
+ runSkewTableSkippingBenchMark()
+ }
+
+ def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+ try f finally tableNames.foreach(spark.catalog.dropTempView)
+ }
+
+ def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int =
1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = {
+ import spark.implicits._
+ val df = spark.range(numRows).map(_ => (Random.nextInt(col1Range),
Random.nextInt(col2Range))).toDF("c1_int", "c2_int")
+ val dfOptimizeByMap =
OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int",
200, "z-order")
+ val dfOptimizeBySample =
OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200,
"z-order")
+
+ val dfHilbertOptimizeByMap =
OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int",
200, "hilbert")
+ val dfHilbertOptimizeBySample =
OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200,
"hilbert")
+
+ saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew"
else "z_sort_byMap")
+ saveAsTable(dfOptimizeBySample, tablePath, if (skewed)
"z_sort_bySample_skew" else "z_sort_bySample")
+ saveAsTable(dfHilbertOptimizeByMap, tablePath, if (skewed)
"hilbert_sort_byMap_skew" else "hilbert_sort_byMap")
+ saveAsTable(dfHilbertOptimizeBySample, tablePath, if (skewed)
"hilbert_sort_bySample_skew" else "hilbert_sort_bySample")
+ }
+
+ def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = {
+
+ df.write.mode("overwrite").save(new Path(savePath, suffix).toString)
+ spark.read.parquet(new Path(savePath,
suffix).toString).createOrReplaceTempView("table_" + suffix)
+ }
+}
+
diff --git a/packaging/hudi-flink-bundle/pom.xml
b/packaging/hudi-flink-bundle/pom.xml
index 5f45fdd..1766a98 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -107,6 +107,8 @@
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-core</include>
+ <include>com.github.davidmoten:guava-mini</include>
+ <include>com.github.davidmoten:hilbert-curve</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.dropwizard.metrics:metrics-core</include>
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml
b/packaging/hudi-kafka-connect-bundle/pom.xml
index cd7b151..b8a4bcd 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -89,6 +89,8 @@
<include>org.apache.flink:flink-core</include>
<include>org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}</include>
+
<include>com.github.davidmoten:guava-mini</include>
+
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.yammer.metrics:metrics-core</include>
<include>com.beust:jcommander</include>
<include>io.javalin:javalin</include>
diff --git a/packaging/hudi-spark-bundle/pom.xml
b/packaging/hudi-spark-bundle/pom.xml
index 32a9abf..02d11be 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -88,6 +88,8 @@
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>
+ <include>com.github.davidmoten:guava-mini</include>
+ <include>com.github.davidmoten:hilbert-curve</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.dropwizard.metrics:metrics-core</include>
diff --git a/packaging/hudi-utilities-bundle/pom.xml
b/packaging/hudi-utilities-bundle/pom.xml
index 568e9d5..798c27f 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -117,6 +117,8 @@
<include>com.amazonaws:aws-java-sdk-dynamodb</include>
<include>com.amazonaws:aws-java-sdk-core</include>
+ <include>com.github.davidmoten:guava-mini</include>
+ <include>com.github.davidmoten:hilbert-curve</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.confluent:kafka-avro-serializer</include>