xiarixiaoyao commented on a change in pull request #4060:
URL: https://github.com/apache/hudi/pull/4060#discussion_r753778621
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -284,55 +288,102 @@ public Boolean apply(String recordKey) {
/**
* Parse min/max statistics stored in parquet footers for all columns.
*/
- public Collection<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath,
List<String> cols) {
+ public List<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
+ @Nonnull Configuration conf,
+ @Nonnull Path parquetFilePath,
+ @Nonnull List<String> cols
+ ) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
- // collect stats from all parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
- return blockMetaData.getColumns().stream().filter(f ->
cols.contains(f.getPath().toDotString())).map(columnChunkMetaData ->
- new HoodieColumnRangeMetadata<>(parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
- columnChunkMetaData.getStatistics().genericGetMin(),
- columnChunkMetaData.getStatistics().genericGetMax(),
- columnChunkMetaData.getStatistics().getNumNulls(),
- columnChunkMetaData.getPrimitiveType().stringifier()));
- }).collect(Collectors.groupingBy(e -> e.getColumnName()));
-
- // we only intend to keep file level statistics.
- return new ArrayList<>(columnToStatsListMap.values().stream()
- .map(blocks -> getColumnRangeInFile(blocks))
- .collect(Collectors.toList()));
+ // Collect stats from all individual Parquet blocks
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
+ metadata.getBlocks()
+ .stream()
+ .sequential()
+ .flatMap(blockMetaData ->
+ blockMetaData.getColumns()
+ .stream()
+ .filter(f -> cols.contains(f.getPath().toDotString()))
+ .map(columnChunkMetaData ->
+ new HoodieColumnRangeMetadata<Comparable>(
+ parquetFilePath.getName(),
+ columnChunkMetaData.getPath().toDotString(),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMin()),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+
columnChunkMetaData.getStatistics().genericGetMax()),
+ columnChunkMetaData.getStatistics().getNumNulls(),
+
columnChunkMetaData.getPrimitiveType().stringifier()))
+ )
+
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+
+ // Combine those into file-level statistics
+ // NOTE: Inlining this var makes javac (1.8) upset (due to its inability
to infer
+ // expression type correctly)
+ Stream<HoodieColumnRangeMetadata<Comparable>> stream =
columnToStatsListMap.values()
+ .stream()
+ .map(this::getColumnRangeInFile);
+
+ return stream.collect(Collectors.toList());
}
- private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final
List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
+ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInFile(
+ @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
+ ) {
if (blockRanges.size() == 1) {
// only one block in parquet file. we can just return that range.
return blockRanges.get(0);
- } else {
- // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
- return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1,
b2)).get();
}
+
+ // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
+ return blockRanges.stream()
+ .sequential()
+ .reduce(this::combineRanges).get();
}
- private HoodieColumnRangeMetadata<Comparable>
combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
-
HoodieColumnRangeMetadata<Comparable> range2) {
- final Comparable minValue;
- final Comparable maxValue;
- if (range1.getMinValue() != null && range2.getMinValue() != null) {
- minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ?
range1.getMinValue() : range2.getMinValue();
- } else if (range1.getMinValue() == null) {
- minValue = range2.getMinValue();
+ private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
+ HoodieColumnRangeMetadata<T> one,
+ HoodieColumnRangeMetadata<T> another
+ ) {
+ final T minValue;
+ final T maxValue;
+ if (one.getMinValue() != null && another.getMinValue() != null) {
+ minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ?
one.getMinValue() : another.getMinValue();
+ } else if (one.getMinValue() == null) {
+ minValue = another.getMinValue();
} else {
- minValue = range1.getMinValue();
+ minValue = one.getMinValue();
}
- if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
- maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ?
range2.getMaxValue() : range1.getMaxValue();
- } else if (range1.getMaxValue() == null) {
- maxValue = range2.getMaxValue();
+ if (one.getMaxValue() != null && another.getMaxValue() != null) {
+ maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ?
another.getMaxValue() : one.getMaxValue();
+ } else if (one.getMaxValue() == null) {
+ maxValue = another.getMaxValue();
} else {
- maxValue = range1.getMaxValue();
+ maxValue = one.getMaxValue();
+ }
+
+ return new HoodieColumnRangeMetadata<T>(
+ one.getFilePath(),
+ one.getColumnName(), minValue, maxValue, one.getNumNulls() +
another.getNumNulls(), one.getStringifier());
+ }
+
+ private static Comparable<?> convertToNativeJavaType(PrimitiveType
primitiveType, Comparable val) {
+ if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
Review comment:
when we read statistic info from paquet, we can use
Statistics.minAsString and Statistics.maxAsString to deal with DecimalType and
dateType correctly. and avoids additional conversion operations, as well as
synchronization operations
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -29,148 +30,186 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
-object DataSkippingUtils {
+object DataSkippingUtils extends Logging {
/**
- * create z_index filter and push those filters to index table to filter
all candidate scan files.
- * @param condition origin filter from query.
- * @param indexSchema schema from index table.
- * @return filters for index table.
- */
- def createZindexFilter(condition: Expression, indexSchema: StructType):
Expression = {
- def buildExpressionInternal(colName: Seq[String], statisticValue: String):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + statisticValue
- col(appendColName).expr
- }
-
- def reWriteCondition(colName: Seq[String], conditionExpress: Expression):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + "_minValue"
- if (indexSchema.exists(p => p.name == appendColName)) {
- conditionExpress
- } else {
- Literal.TrueLiteral
- }
- }
-
- val minValue = (colName: Seq[String]) => buildExpressionInternal(colName,
"_minValue")
- val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName,
"_maxValue")
- val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName,
"_num_nulls")
-
- condition match {
- // query filter "colA = b" convert it to "colA_minValue <= b and
colA_maxValue >= b" for index table
+ * Translates provided {@link filterExpr} into corresponding
filter-expression for Z-index index table
+ * to filter out candidate files that would hold records matching the
original filter
+ *
+ * @param filterExpr original filter from query
+ * @param indexSchema index table schema
+ * @return filter for Z-index table
+ */
+ def createZIndexLookupFilter(filterExpr: Expression, indexSchema:
StructType): Expression = {
+
+ def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
+ def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
+ def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
+
+ def colContainsValuesEqualToLiteral(colName: String, value: Literal) =
+ // Only case when column C contains value V is when min(C) <= V <= max(c)
+ And(LessThanOrEqual(minValue(colName), value),
GreaterThanOrEqual(maxValue(colName), value))
+
+ def colContainsValuesEqualToLiterals(colName: String, list: Seq[Literal]) =
+ // Only case when column C contains _any_ of the values V1, V2, etc is
when either
+ // min(C) <= V1 <= max(c) OR
+ // min(C) <= V2 <= max(c) OR
+ // ...
+ list.map { lit => colContainsValuesEqualToLiteral(colName, lit)
}.reduce(Or)
+
+ filterExpr match {
+ // Filter "colA = b"
+ // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition
for index lookup
case EqualTo(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, And(LessThanOrEqual(minValue(colName),
value), GreaterThanOrEqual(maxValue(colName), value)))
- // query filter "b = colA" convert it to "colA_minValue <= b and
colA_maxValue >= b" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, value)
+ // Filter "b = colA"
+ // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition
for index lookup
case EqualTo(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, And(LessThanOrEqual(minValue(colName),
value), GreaterThanOrEqual(maxValue(colName), value)))
- // query filter "colA = null" convert it to "colA_num_nulls = null" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, value)
+ // Filter "colA = null"
+ // Translates to "colA_num_nulls = null" for index lookup
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @
Literal(null, _)) =>
- val colName = getTargetColNameParts(equalNullSafe.left)
- reWriteCondition(colName, EqualTo(num_nulls(colName),
equalNullSafe.right))
- // query filter "colA < b" convert it to "colA_minValue < b" for index
table
+ val colName = getTargetColName(equalNullSafe.left, indexSchema)
+ EqualTo(numNulls(colName), equalNullSafe.right)
+ // Filter "colA < b"
+ // Translates to "colA_minValue < b" for index lookup
case LessThan(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName,LessThan(minValue(colName), value))
- // query filter "b < colA" convert it to "colA_maxValue > b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThan(minValue(colName), value)
+ // Filter "b < colA"
+ // Translates to "b < colA_maxValue" for index lookup
case LessThan(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(maxValue(colName), value))
- // query filter "colA > b" convert it to "colA_maxValue > b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(maxValue(colName), value)
+ // Filter "colA > b"
+ // Translates to "colA_maxValue > b" for index lookup
case GreaterThan(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(maxValue(colName), value))
- // query filter "b > colA" convert it to "colA_minValue < b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(maxValue(colName), value)
+ // Filter "b > colA"
+ // Translates to "b > colA_minValue" for index lookup
case GreaterThan(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThan(minValue(colName), value))
- // query filter "colA <= b" convert it to "colA_minValue <= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThan(minValue(colName), value)
+ // Filter "colA <= b"
+ // Translates to "colA_minValue <= b" for index lookup
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
- // query filter "b <= colA" convert it to "colA_maxValue >= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThanOrEqual(minValue(colName), value)
+ // Filter "b <= colA"
+ // Translates to "b <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
- // query filter "colA >= b" convert it to "colA_maxValue >= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThanOrEqual(maxValue(colName), value)
+ // Filter "colA >= b"
+ // Translates to "colA_maxValue >= b" for index lookup
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
- val colName = getTargetColNameParts(attribute)
+ val colName = getTargetColName(attribute, indexSchema)
GreaterThanOrEqual(maxValue(colName), right)
- // query filter "b >= colA" convert it to "colA_minValue <= b" for
index table
+ // Filter "b >= colA"
+ // Translates to "b >= colA_minValue" for index lookup
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
- // query filter "colA is null" convert it to "colA_num_nulls > 0" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThanOrEqual(minValue(colName), value)
+ // Filter "colA is null"
+ // Translates to "colA_num_nulls > 0" for index lookup
case IsNull(attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
- // query filter "colA is not null" convert it to "colA_num_nulls = 0"
for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(numNulls(colName), Literal(0))
+ // Filter "colA is not null"
+ // Translates to "colA_num_nulls = 0" for index lookup
case IsNotNull(attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
- // query filter "colA in (a,b)" convert it to " (colA_minValue <= a
and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ EqualTo(numNulls(colName), Literal(0))
+ // Filter "colA in (a, b, ...)"
+ // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR
(colA_minValue <= b AND colA_maxValue >= b)" for index lookup
case In(attribute: AttributeReference, list: Seq[Literal]) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, list.map { lit =>
- And(LessThanOrEqual(minValue(colName), lit),
GreaterThanOrEqual(maxValue(colName), lit))
- }.reduce(Or))
- // query filter "colA like xxx" convert it to " (colA_minValue <= xxx
and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with
xxx) " for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiterals(colName, list)
+ // Filter "colA like xxx"
+ // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for
index lookup
+ // NOTE: That this operator only matches string prefixes, and this is
+ // essentially equivalent to "colA = b" expression
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName),
v), GreaterThanOrEqual(maxValue(colName), v)) ,
- Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName),
v))))
- // query filter "colA not in (a, b)" convert it to " (not(
colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and
colA_maxValue = b)) " for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, v)
+ // Filter "colA not in (a, b, ...)"
+ // Translates to "(colA_minValue > a OR colA_maxValue < a) AND
(colA_minValue > b OR colA_maxValue < b)" for index lookup
+ // NOTE: This is an inversion of `in (a, b, ...)` expr
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, list.map { lit =>
- Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName),
lit)))
- }.reduce(And))
- // query filter "colA != b" convert it to "not ( colA_minValue = b and
colA_maxValue = b )" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ Not(colContainsValuesEqualToLiterals(colName, list))
+ // Filter "colA != b"
+ // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an
inversion of expr for "colA = b") for index lookup
+ // NOTE: This is an inversion of `colA = b` expr
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value),
EqualTo(maxValue(colName), value))))
- // query filter "b != colA" convert it to "not ( colA_minValue = b and
colA_maxValue = b )" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ Not(colContainsValuesEqualToLiteral(colName, value))
+ // Filter "b != colA"
+ // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an
inversion of expr for "colA = b") for index lookup
+ // NOTE: This is an inversion of `colA != b` expr
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value),
EqualTo(maxValue(colName), value))))
- // query filter "colA not like xxxx" convert it to "not (
colA_minValue startWith xxx and colA_maxValue startWith xxx)" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ Not(colContainsValuesEqualToLiteral(colName, value))
+ // Filter "colA not like xxx"
+ // Translates to "!(colA_minValue <= xxx AND colA_maxValue >= xxx)" for
index lookup
+ // NOTE: This is a inversion of "colA like xxx" assuming that colA is a
string-based type
case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, Not(And(StartsWith(minValue(colName),
value), StartsWith(maxValue(colName), value))))
+ val colName = getTargetColName(attribute, indexSchema)
+ Not(colContainsValuesEqualToLiteral(colName, value))
+
case or: Or =>
- val resLeft = createZindexFilter(or.left, indexSchema)
- val resRight = createZindexFilter(or.right, indexSchema)
+ val resLeft = createZIndexLookupFilter(or.left, indexSchema)
+ val resRight = createZIndexLookupFilter(or.right, indexSchema)
Or(resLeft, resRight)
case and: And =>
- val resLeft = createZindexFilter(and.left, indexSchema)
- val resRight = createZindexFilter(and.right, indexSchema)
+ val resLeft = createZIndexLookupFilter(and.left, indexSchema)
+ val resRight = createZIndexLookupFilter(and.right, indexSchema)
And(resLeft, resRight)
case expr: Expression =>
Literal.TrueLiteral
}
}
- /**
- * Extracts name from a resolved expression referring to a nested or
non-nested column.
- */
- def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = {
+ private def checkColIsIndexed(colName: String, indexSchema: StructType):
Boolean = {
+ Set.apply(
+ getMinColumnNameFor(colName),
+ getMaxColumnNameFor(colName),
+ getNumNullsColumnNameFor(colName)
+ )
+ .forall(stat => indexSchema.exists(_.name == stat))
+ }
+
+ private def getTargetColName(resolvedExpr: Expression, indexSchema:
StructType): String = {
+ val colName = UnresolvedAttribute(getTargetColNameParts(resolvedExpr)).name
+
+ // Verify that the column is indexed
+ // NOTE: That in case filtering expression contains referenced to a column
that is NOT
+ // indexed data pruning becomes (largely) impossible
+ if (!checkColIsIndexed(colName, indexSchema)) {
+ logDebug(s"Filtering expression contains column that is not indexed
($colName)")
+ throw new AnalysisException(s"Filtering expression contains column that
is not indexed ($colName)")
Review comment:
This place should not throw exceptions. For a filter, if the column it
refers to does not exist in the index, just change the filter to
literal.trueliteral. View the original logic of rewritecondition, In this way,
data will not be lost in the process of index query
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -29,148 +30,186 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
-object DataSkippingUtils {
+object DataSkippingUtils extends Logging {
/**
- * create z_index filter and push those filters to index table to filter
all candidate scan files.
- * @param condition origin filter from query.
- * @param indexSchema schema from index table.
- * @return filters for index table.
- */
- def createZindexFilter(condition: Expression, indexSchema: StructType):
Expression = {
- def buildExpressionInternal(colName: Seq[String], statisticValue: String):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + statisticValue
- col(appendColName).expr
- }
-
- def reWriteCondition(colName: Seq[String], conditionExpress: Expression):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + "_minValue"
- if (indexSchema.exists(p => p.name == appendColName)) {
- conditionExpress
- } else {
- Literal.TrueLiteral
- }
- }
-
- val minValue = (colName: Seq[String]) => buildExpressionInternal(colName,
"_minValue")
- val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName,
"_maxValue")
- val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName,
"_num_nulls")
-
- condition match {
- // query filter "colA = b" convert it to "colA_minValue <= b and
colA_maxValue >= b" for index table
+ * Translates provided {@link filterExpr} into corresponding
filter-expression for Z-index index table
+ * to filter out candidate files that would hold records matching the
original filter
+ *
+ * @param filterExpr original filter from query
+ * @param indexSchema index table schema
+ * @return filter for Z-index table
+ */
+ def createZIndexLookupFilter(filterExpr: Expression, indexSchema:
StructType): Expression = {
+
+ def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
+ def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
+ def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
+
+ def colContainsValuesEqualToLiteral(colName: String, value: Literal) =
+ // Only case when column C contains value V is when min(C) <= V <= max(c)
+ And(LessThanOrEqual(minValue(colName), value),
GreaterThanOrEqual(maxValue(colName), value))
+
+ def colContainsValuesEqualToLiterals(colName: String, list: Seq[Literal]) =
+ // Only case when column C contains _any_ of the values V1, V2, etc is
when either
+ // min(C) <= V1 <= max(c) OR
+ // min(C) <= V2 <= max(c) OR
+ // ...
+ list.map { lit => colContainsValuesEqualToLiteral(colName, lit)
}.reduce(Or)
+
+ filterExpr match {
+ // Filter "colA = b"
+ // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition
for index lookup
case EqualTo(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, And(LessThanOrEqual(minValue(colName),
value), GreaterThanOrEqual(maxValue(colName), value)))
- // query filter "b = colA" convert it to "colA_minValue <= b and
colA_maxValue >= b" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, value)
+ // Filter "b = colA"
+ // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition
for index lookup
case EqualTo(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, And(LessThanOrEqual(minValue(colName),
value), GreaterThanOrEqual(maxValue(colName), value)))
- // query filter "colA = null" convert it to "colA_num_nulls = null" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, value)
+ // Filter "colA = null"
+ // Translates to "colA_num_nulls = null" for index lookup
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @
Literal(null, _)) =>
- val colName = getTargetColNameParts(equalNullSafe.left)
- reWriteCondition(colName, EqualTo(num_nulls(colName),
equalNullSafe.right))
- // query filter "colA < b" convert it to "colA_minValue < b" for index
table
+ val colName = getTargetColName(equalNullSafe.left, indexSchema)
+ EqualTo(numNulls(colName), equalNullSafe.right)
+ // Filter "colA < b"
+ // Translates to "colA_minValue < b" for index lookup
case LessThan(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName,LessThan(minValue(colName), value))
- // query filter "b < colA" convert it to "colA_maxValue > b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThan(minValue(colName), value)
+ // Filter "b < colA"
+ // Translates to "b < colA_maxValue" for index lookup
case LessThan(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(maxValue(colName), value))
- // query filter "colA > b" convert it to "colA_maxValue > b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(maxValue(colName), value)
+ // Filter "colA > b"
+ // Translates to "colA_maxValue > b" for index lookup
case GreaterThan(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(maxValue(colName), value))
- // query filter "b > colA" convert it to "colA_minValue < b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(maxValue(colName), value)
+ // Filter "b > colA"
+ // Translates to "b > colA_minValue" for index lookup
case GreaterThan(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThan(minValue(colName), value))
- // query filter "colA <= b" convert it to "colA_minValue <= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThan(minValue(colName), value)
+ // Filter "colA <= b"
+ // Translates to "colA_minValue <= b" for index lookup
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
- // query filter "b <= colA" convert it to "colA_maxValue >= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThanOrEqual(minValue(colName), value)
+ // Filter "b <= colA"
+ // Translates to "b <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
- // query filter "colA >= b" convert it to "colA_maxValue >= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThanOrEqual(maxValue(colName), value)
+ // Filter "colA >= b"
+ // Translates to "colA_maxValue >= b" for index lookup
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
- val colName = getTargetColNameParts(attribute)
+ val colName = getTargetColName(attribute, indexSchema)
GreaterThanOrEqual(maxValue(colName), right)
- // query filter "b >= colA" convert it to "colA_minValue <= b" for
index table
+ // Filter "b >= colA"
+ // Translates to "b >= colA_minValue" for index lookup
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
- // query filter "colA is null" convert it to "colA_num_nulls > 0" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThanOrEqual(minValue(colName), value)
+ // Filter "colA is null"
+ // Translates to "colA_num_nulls > 0" for index lookup
case IsNull(attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
- // query filter "colA is not null" convert it to "colA_num_nulls = 0"
for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(numNulls(colName), Literal(0))
+ // Filter "colA is not null"
+ // Translates to "colA_num_nulls = 0" for index lookup
case IsNotNull(attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
- // query filter "colA in (a,b)" convert it to " (colA_minValue <= a
and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ EqualTo(numNulls(colName), Literal(0))
+ // Filter "colA in (a, b, ...)"
+ // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR
(colA_minValue <= b AND colA_maxValue >= b)" for index lookup
case In(attribute: AttributeReference, list: Seq[Literal]) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, list.map { lit =>
- And(LessThanOrEqual(minValue(colName), lit),
GreaterThanOrEqual(maxValue(colName), lit))
- }.reduce(Or))
- // query filter "colA like xxx" convert it to " (colA_minValue <= xxx
and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with
xxx) " for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiterals(colName, list)
+ // Filter "colA like xxx"
+ // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for
index lookup
+ // NOTE: That this operator only matches string prefixes, and this is
+ // essentially equivalent to "colA = b" expression
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName),
v), GreaterThanOrEqual(maxValue(colName), v)) ,
- Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName),
v))))
- // query filter "colA not in (a, b)" convert it to " (not(
colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and
colA_maxValue = b)) " for index table
+ val colName = getTargetColName(attribute, indexSchema)
Review comment:
original logical is : // query filter "colA like xxx" convert it to "
(colA_minValue <= xxx and colA_maxValue >= xxx) or (colA_min start with xxx or
colA_max start with xxx) " for index table
why remove or (colA_min start with xxx or colA_max start with xxx) 。 it
should be worked. think the follow quey:
select count(*) from conn_optimize where src_ip like '157%' and dst_ip like
'216.%'
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -29,148 +30,186 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
-object DataSkippingUtils {
+object DataSkippingUtils extends Logging {
/**
- * create z_index filter and push those filters to index table to filter
all candidate scan files.
- * @param condition origin filter from query.
- * @param indexSchema schema from index table.
- * @return filters for index table.
- */
- def createZindexFilter(condition: Expression, indexSchema: StructType):
Expression = {
- def buildExpressionInternal(colName: Seq[String], statisticValue: String):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + statisticValue
- col(appendColName).expr
- }
-
- def reWriteCondition(colName: Seq[String], conditionExpress: Expression):
Expression = {
- val appendColName = UnresolvedAttribute(colName).name + "_minValue"
- if (indexSchema.exists(p => p.name == appendColName)) {
- conditionExpress
- } else {
- Literal.TrueLiteral
- }
- }
-
- val minValue = (colName: Seq[String]) => buildExpressionInternal(colName,
"_minValue")
- val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName,
"_maxValue")
- val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName,
"_num_nulls")
-
- condition match {
- // query filter "colA = b" convert it to "colA_minValue <= b and
colA_maxValue >= b" for index table
+ * Translates provided {@link filterExpr} into corresponding
filter-expression for Z-index index table
+ * to filter out candidate files that would hold records matching the
original filter
+ *
+ * @param filterExpr original filter from query
+ * @param indexSchema index table schema
+ * @return filter for Z-index table
+ */
+ def createZIndexLookupFilter(filterExpr: Expression, indexSchema:
StructType): Expression = {
+
+ def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
+ def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
+ def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
+
+ def colContainsValuesEqualToLiteral(colName: String, value: Literal) =
+ // Only case when column C contains value V is when min(C) <= V <= max(c)
+ And(LessThanOrEqual(minValue(colName), value),
GreaterThanOrEqual(maxValue(colName), value))
+
+ def colContainsValuesEqualToLiterals(colName: String, list: Seq[Literal]) =
+ // Only case when column C contains _any_ of the values V1, V2, etc is
when either
+ // min(C) <= V1 <= max(c) OR
+ // min(C) <= V2 <= max(c) OR
+ // ...
+ list.map { lit => colContainsValuesEqualToLiteral(colName, lit)
}.reduce(Or)
+
+ filterExpr match {
+ // Filter "colA = b"
+ // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition
for index lookup
case EqualTo(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, And(LessThanOrEqual(minValue(colName),
value), GreaterThanOrEqual(maxValue(colName), value)))
- // query filter "b = colA" convert it to "colA_minValue <= b and
colA_maxValue >= b" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, value)
+ // Filter "b = colA"
+ // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition
for index lookup
case EqualTo(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, And(LessThanOrEqual(minValue(colName),
value), GreaterThanOrEqual(maxValue(colName), value)))
- // query filter "colA = null" convert it to "colA_num_nulls = null" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, value)
+ // Filter "colA = null"
+ // Translates to "colA_num_nulls = null" for index lookup
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @
Literal(null, _)) =>
- val colName = getTargetColNameParts(equalNullSafe.left)
- reWriteCondition(colName, EqualTo(num_nulls(colName),
equalNullSafe.right))
- // query filter "colA < b" convert it to "colA_minValue < b" for index
table
+ val colName = getTargetColName(equalNullSafe.left, indexSchema)
+ EqualTo(numNulls(colName), equalNullSafe.right)
+ // Filter "colA < b"
+ // Translates to "colA_minValue < b" for index lookup
case LessThan(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName,LessThan(minValue(colName), value))
- // query filter "b < colA" convert it to "colA_maxValue > b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThan(minValue(colName), value)
+ // Filter "b < colA"
+ // Translates to "b < colA_maxValue" for index lookup
case LessThan(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(maxValue(colName), value))
- // query filter "colA > b" convert it to "colA_maxValue > b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(maxValue(colName), value)
+ // Filter "colA > b"
+ // Translates to "colA_maxValue > b" for index lookup
case GreaterThan(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(maxValue(colName), value))
- // query filter "b > colA" convert it to "colA_minValue < b" for index
table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(maxValue(colName), value)
+ // Filter "b > colA"
+ // Translates to "b > colA_minValue" for index lookup
case GreaterThan(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThan(minValue(colName), value))
- // query filter "colA <= b" convert it to "colA_minValue <= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThan(minValue(colName), value)
+ // Filter "colA <= b"
+ // Translates to "colA_minValue <= b" for index lookup
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
- // query filter "b <= colA" convert it to "colA_maxValue >= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThanOrEqual(minValue(colName), value)
+ // Filter "b <= colA"
+ // Translates to "b <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
- // query filter "colA >= b" convert it to "colA_maxValue >= b" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThanOrEqual(maxValue(colName), value)
+ // Filter "colA >= b"
+ // Translates to "colA_maxValue >= b" for index lookup
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
- val colName = getTargetColNameParts(attribute)
+ val colName = getTargetColName(attribute, indexSchema)
GreaterThanOrEqual(maxValue(colName), right)
- // query filter "b >= colA" convert it to "colA_minValue <= b" for
index table
+ // Filter "b >= colA"
+ // Translates to "b >= colA_minValue" for index lookup
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
- // query filter "colA is null" convert it to "colA_num_nulls > 0" for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ LessThanOrEqual(minValue(colName), value)
+ // Filter "colA is null"
+ // Translates to "colA_num_nulls > 0" for index lookup
case IsNull(attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
- // query filter "colA is not null" convert it to "colA_num_nulls = 0"
for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ GreaterThan(numNulls(colName), Literal(0))
+ // Filter "colA is not null"
+ // Translates to "colA_num_nulls = 0" for index lookup
case IsNotNull(attribute: AttributeReference) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
- // query filter "colA in (a,b)" convert it to " (colA_minValue <= a
and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for
index table
+ val colName = getTargetColName(attribute, indexSchema)
+ EqualTo(numNulls(colName), Literal(0))
+ // Filter "colA in (a, b, ...)"
+ // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR
(colA_minValue <= b AND colA_maxValue >= b)" for index lookup
case In(attribute: AttributeReference, list: Seq[Literal]) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, list.map { lit =>
- And(LessThanOrEqual(minValue(colName), lit),
GreaterThanOrEqual(maxValue(colName), lit))
- }.reduce(Or))
- // query filter "colA like xxx" convert it to " (colA_minValue <= xxx
and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with
xxx) " for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiterals(colName, list)
+ // Filter "colA like xxx"
+ // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for
index lookup
+ // NOTE: That this operator only matches string prefixes, and this is
+ // essentially equivalent to "colA = b" expression
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName),
v), GreaterThanOrEqual(maxValue(colName), v)) ,
- Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName),
v))))
- // query filter "colA not in (a, b)" convert it to " (not(
colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and
colA_maxValue = b)) " for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ colContainsValuesEqualToLiteral(colName, v)
+ // Filter "colA not in (a, b, ...)"
+ // Translates to "(colA_minValue > a OR colA_maxValue < a) AND
(colA_minValue > b OR colA_maxValue < b)" for index lookup
+ // NOTE: This is an inversion of `in (a, b, ...)` expr
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
- val colName = getTargetColNameParts(attribute)
- reWriteCondition(colName, list.map { lit =>
- Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName),
lit)))
- }.reduce(And))
- // query filter "colA != b" convert it to "not ( colA_minValue = b and
colA_maxValue = b )" for index table
+ val colName = getTargetColName(attribute, indexSchema)
+ Not(colContainsValuesEqualToLiterals(colName, list))
+ // Filter "colA != b"
+ // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an
inversion of expr for "colA = b") for index lookup
+ // NOTE: This is an inversion of `colA = b` expr
Review comment:
A filter of type not cannot be operated directly in this way。
think that we have a filter colA != 3,
the file1 contains three values 1,2,4 , now colA_minValue= 1 and
colA_maxValue = 4
the translate filter: 1 > 3 or 4 < 3 This judgment condition will not hold,
and file1 will be excluded, this is wrong.
All not operations cannot be reversed directly, and data will be lost
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]