vinothchandar commented on a change in pull request #4026:
URL: https://github.com/apache/hudi/pull/4026#discussion_r755660517
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -160,41 +160,92 @@ case class HoodieFileIndex(
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"false")).toBoolean
}
- private def filterFilesByDataSkippingIndex(dataFilters: Seq[Expression]):
Set[String] = {
- var allFiles: Set[String] = Set.empty
- var candidateFiles: Set[String] = Set.empty
+ /**
+ * Computes pruned list of candidate base-files' names based on provided
list of {@link dataFilters}
+ * conditions, by leveraging custom Z-order index (Z-index) bearing "min",
"max", "num_nulls" statistic
+ * for all clustered columns
+ *
+ * NOTE: This method has to return complete set of candidate files, since
only provided candidates will
+ * ultimately be scanned as part of query execution. Hence, this
method has to maintain the
+ * invariant of conservatively including every base-file's name, that
is NOT referenced in its index.
+ *
+ * @param dataFilters list of original data filters passed down from
querying engine
+ * @return list of pruned (data-skipped) candidate base-files' names
+ */
+ private def lookupCandidateFilesNamesInZIndex(dataFilters: Seq[Expression]):
Option[Set[String]] = {
Review comment:
rename method
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java
##########
@@ -299,50 +298,54 @@ public static void saveStatisticsInfo(Dataset<Row> df,
String cols, String index
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
- if (fs.exists(new Path(indexPath))) {
- List<String> allIndexTables = Arrays
- .stream(fs.listStatus(new Path(indexPath))).filter(f ->
f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
- List<String> candidateIndexTables = allIndexTables.stream().filter(f
-> validateCommits.contains(f)).sorted().collect(Collectors.toList());
- List<String> residualTables = allIndexTables.stream().filter(f ->
!validateCommits.contains(f)).collect(Collectors.toList());
- Option<Dataset> latestIndexData = Option.empty();
- if (!candidateIndexTables.isEmpty()) {
- latestIndexData = Option.of(spark.read().load(new Path(indexPath,
candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
- // clean old index table, keep at most 1 index table.
- candidateIndexTables.remove(candidateIndexTables.size() - 1);
- candidateIndexTables.forEach(f -> {
- try {
- fs.delete(new Path(indexPath, f));
- } catch (IOException ie) {
- throw new HoodieException(ie);
- }
- });
- }
+ // If there's currently no index, create one
+ if (!fs.exists(new Path(indexPath))) {
+
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+ return;
+ }
- // clean residualTables
- // retried cluster operations at the same instant time is also
considered,
- // the residual files produced by retried are cleaned up before save
statistics
- // save statistics info to index table which named commitTime
- residualTables.forEach(f -> {
+ // Otherwise, clean up all indexes but the most recent one
+
+ List<String> allIndexTables = Arrays
+ .stream(fs.listStatus(new Path(indexPath))).filter(f ->
f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
+ List<String> candidateIndexTables = allIndexTables.stream().filter(f ->
validateCommits.contains(f)).sorted().collect(Collectors.toList());
+ List<String> residualTables = allIndexTables.stream().filter(f ->
!validateCommits.contains(f)).collect(Collectors.toList());
+ Option<Dataset> latestIndexData = Option.empty();
+ if (!candidateIndexTables.isEmpty()) {
+ latestIndexData = Option.of(spark.read().load(new Path(indexPath,
candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
+ // clean old index table, keep at most 1 index table.
+ candidateIndexTables.remove(candidateIndexTables.size() - 1);
+ candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
+ }
- if (latestIndexData.isPresent() &&
latestIndexData.get().schema().equals(statisticsDF.schema())) {
- // update the statistics info
- String originalTable = "indexTable_" +
java.util.UUID.randomUUID().toString().replace("-", "");
- String updateTable = "updateTable_" +
java.util.UUID.randomUUID().toString().replace("-", "");
- latestIndexData.get().registerTempTable(originalTable);
- statisticsDF.registerTempTable(updateTable);
- // update table by full out join
- List columns = Arrays.asList(statisticsDF.schema().fieldNames());
- spark.sql(HoodieSparkUtils$
- .MODULE$.createMergeSql(originalTable, updateTable,
JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
- } else {
-
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+ // clean residualTables
+ // retried cluster operations at the same instant time is also
considered,
+ // the residual files produced by retried are cleaned up before save
statistics
+ // save statistics info to index table which named commitTime
+ residualTables.forEach(f -> {
+ try {
+ fs.delete(new Path(indexPath, f));
+ } catch (IOException ie) {
+ throw new HoodieException(ie);
Review comment:
add error message to the exception to get some specific context?
understand that this code existed prior.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
##########
@@ -83,30 +87,41 @@ class TestOptimizeTable extends HoodieClientTestBase {
.option("hoodie.clustering.plan.strategy.target.file.max.bytes",
"1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group",
Long.MaxValue.toString)
- .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(64 *1024 * 1024L))
+ .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(64 * 1024 * 1024L))
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key,
"begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
- assertEquals(1000, spark.read.format("hudi").load(basePath).count())
- // use unsorted col as filter.
- assertEquals(spark.read
- .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and
weight > 0.0").count(),
- spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"true")
- .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1'
and weight > 0.0").count())
- // use sorted col as filter.
- assertEquals(spark.read.format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and
begin_lon < 0.51").count(),
- spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"true")
- .format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49
and begin_lon < 0.51").count())
- // use sorted cols and unsorted cols as filter
- assertEquals(spark.read.format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat >
0.56").count(),
- spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"true")
- .format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat >
0.56").count())
+ val readDf =
+ spark.read
+ .format("hudi")
+ .load(basePath)
+
+ val readDfSkip =
+ spark.read
+ .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
+ .format("hudi")
+ .load(basePath)
+
+ assertEquals(targetRecordsCount, readDf.count())
+ assertEquals(targetRecordsCount, readDfSkip.count())
+
+ readDf.createOrReplaceTempView("hudi_snapshot_raw")
+ readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
+
+ def select(tableName: String) =
+ spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND
begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
+
+ assertRowsMatch(
+ select("hudi_snapshot_raw"),
+ select("hudi_snapshot_skipping")
+ )
+ }
+
+ def assertRowsMatch(one: DataFrame, other: DataFrame) = {
Review comment:
don't we have any helpers for this? if not, lets move this to a helper?
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
##########
@@ -83,30 +87,41 @@ class TestOptimizeTable extends HoodieClientTestBase {
.option("hoodie.clustering.plan.strategy.target.file.max.bytes",
"1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group",
Long.MaxValue.toString)
- .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(64 *1024 * 1024L))
+ .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(64 * 1024 * 1024L))
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key,
"begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
- assertEquals(1000, spark.read.format("hudi").load(basePath).count())
- // use unsorted col as filter.
- assertEquals(spark.read
- .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and
weight > 0.0").count(),
- spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"true")
- .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1'
and weight > 0.0").count())
- // use sorted col as filter.
- assertEquals(spark.read.format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and
begin_lon < 0.51").count(),
- spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"true")
- .format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49
and begin_lon < 0.51").count())
- // use sorted cols and unsorted cols as filter
- assertEquals(spark.read.format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat >
0.56").count(),
- spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"true")
- .format("hudi").load(basePath)
- .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat >
0.56").count())
+ val readDf =
Review comment:
can we ensure the clustering actually ran before we validate? otherwise
any scheduling bugs will still have this test passing?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -206,18 +257,22 @@ case class HoodieFileIndex(
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]):
Seq[PartitionDirectory] = {
- // try to load filterFiles from index
- val filterFiles: Set[String] = if (enableDataSkipping()) {
- filterFilesByDataSkippingIndex(dataFilters)
- } else {
- Set.empty
- }
+ // Look up candidate files names in the Z-index, if all of the following
conditions are true
+ // - Data-skipping is enabled
+ // - Z-index is present
+ // - List of predicates (filters) is present
+ val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesNamesInZIndex(dataFilters)
+
+ logDebug(s"Overlapping candidate files (from Z-index):
${candidateFilesNamesOpt.getOrElse(Set.empty)}")
+
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
- val candidateFiles = if (!filterFiles.isEmpty) {
- allFiles.filterNot(fileStatus =>
filterFiles.contains(fileStatus.getPath.getName))
- } else {
- allFiles
- }
+ // Filter in candidate files based on the Z-index lookup
+ val candidateFiles =
+ allFiles.filter(fileStatus =>
+ // NOTE: This predicate is true when {@code Option} is empty
Review comment:
So by default - data skipping being off, this is how none of the files
in `allFiles` are filtered out.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -36,120 +36,153 @@ import scala.collection.JavaConverters._
object DataSkippingUtils {
/**
- * create z_index filter and push those filters to index table to filter
all candidate scan files.
- * @param condition origin filter from query.
- * @param indexSchema schema from index table.
- * @return filters for index table.
- */
- def createZindexFilter(condition: Expression, indexSchema: StructType):
Expression = {
- def buildExpressionInternal(colName: Seq[String], statisticValue: String):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + statisticValue
- col(appendColName).expr
- }
-
- def reWriteCondition(colName: Seq[String], conditionExpress: Expression):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + "_minValue"
- if (indexSchema.exists(p => p.name == appendColName)) {
+ * Translates provided {@link filterExpr} into corresponding
filter-expression for Z-index index table
+ * to filter out candidate files that would hold records matching the
original filter
+ *
+ * @param filterExpr original filter from query
+ * @param indexSchema index table schema
+ * @return filter for Z-index table
+ */
+ def createZIndexLookupFilter(filterExpr: Expression, indexSchema:
StructType): Expression = {
Review comment:
this is in general space filling/data skipping filtering correct? can
we remove z-order from the names of all these helpers, methods, classes? i.e
data skipping will help across any space curve and also ultimately when we
integrate with RFC-27/metadata table for the column stats, it will work across
any sort based clustering as well. So might be good to keep this generic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]