This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 306f2bb03e6 [HUDI-6077] Add more partition push down filters (#8452)
306f2bb03e6 is described below
commit 306f2bb03e6a42e2bca5f4e0db0379a87ad58a66
Author: Rex(Hui) An <[email protected]>
AuthorDate: Mon Jul 24 18:43:10 2023 +0800
[HUDI-6077] Add more partition push down filters (#8452)
---
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 7 +-
.../functional/TestHoodieBackedMetadata.java | 2 +-
.../io/storage/row/TestHoodieRowCreateHandle.java | 11 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 22 ++
.../java/org/apache/hudi/expression/ArrayData.java | 21 +-
.../apache/hudi/expression/BinaryExpression.java | 37 +-
.../org/apache/hudi/expression/BindVisitor.java | 187 ++++++++++
.../org/apache/hudi/expression/BoundReference.java | 32 +-
.../org/apache/hudi/expression/Comparators.java | 52 +++
.../org/apache/hudi}/expression/Expression.java | 32 +-
.../apache/hudi}/expression/ExpressionVisitor.java | 16 +-
.../apache/hudi}/expression/LeafExpression.java | 11 +-
.../java/org/apache/hudi/expression/Literal.java | 121 ++++++
.../org/apache/hudi/expression/NameReference.java | 20 +-
.../apache/hudi/expression/PartialBindVisitor.java | 153 ++++++++
.../java/org/apache/hudi/expression/Predicate.java | 29 +-
.../org/apache/hudi/expression/Predicates.java | 413 +++++++++++++++++++++
.../org/apache/hudi/expression/StructLike.java | 13 +-
.../java/org/apache/hudi/internal/schema/Type.java | 80 +++-
.../org/apache/hudi/internal/schema/Types.java | 37 +-
.../hudi/internal/schema/utils/Conversions.java | 49 +++
.../hudi/metadata/AbstractHoodieTableMetadata.java | 96 +++++
.../apache/hudi/metadata/BaseTableMetadata.java | 24 +-
.../metadata/FileSystemBackedTableMetadata.java | 111 +++++-
.../hudi/metadata/HoodieBackedTableMetadata.java | 23 ++
.../apache/hudi/metadata/HoodieTableMetadata.java | 10 +
.../hudi/expression/TestPartialBindVisitor.java | 83 +++++
.../schema/utils/TestAvroSchemaEvolutionUtils.java | 2 +-
.../TestFileSystemBackedTableMetadata.java | 12 +-
.../scala/org/apache/hudi/SparkFilterHelper.scala | 122 ++++++
.../apache/hudi/SparkHoodieTableFileIndex.scala | 92 +++--
.../org/apache/hudi/TestHoodieFileIndex.scala | 72 ++++
.../org/apache/hudi/TestSparkFilterHelper.scala | 185 +++++++++
.../sql/hudi/TestHoodieInternalRowUtils.scala | 2 +-
.../sql/hudi/TestLazyPartitionPathFetching.scala | 30 ++
.../TestPartitionPushDownWhenListingPaths.scala | 109 ++++++
.../apache/spark/sql/adapter/Spark2Adapter.scala | 15 +-
.../spark/sql/adapter/BaseSpark3Adapter.scala | 7 +-
.../apache/hudi/hive/util/FilterGenVisitor.java | 84 +++--
.../hudi/hive/util/PartitionFilterGenerator.java | 64 ++--
.../hudi/utilities/HoodieDataTableValidator.java | 2 +-
.../apache/hudi/utilities/HoodieRepairTool.java | 2 +-
42 files changed, 2260 insertions(+), 232 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cc72cf23a6b..782a49ac189 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.{InternalRow,
TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.storage.StorageLevel
@@ -203,4 +203,9 @@ trait SparkAdapter extends Serializable {
* Converts instance of [[StorageLevel]] to a corresponding string
*/
def convertStorageLevelToString(level: StorageLevel): String
+
+ /**
+ * Tries to translate a Catalyst Expression into data source Filter
+ */
+ def translateFilter(predicate: Expression, supportNestedPredicatePushdown:
Boolean = false): Option[Filter]
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 56fabb362b6..5288947bb19 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -3204,7 +3204,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Partitions should match
- FileSystemBackedTableMetadata fsBackedTableMetadata = new
FileSystemBackedTableMetadata(engineContext,
+ FileSystemBackedTableMetadata fsBackedTableMetadata = new
FileSystemBackedTableMetadata(engineContext, metaClient.getTableConfig(),
new SerializableConfiguration(hadoopConf), config.getBasePath(),
config.shouldAssumeDatePartitioning());
List<String> fsPartitions = fsBackedTableMetadata.getAllPartitionPaths();
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index 6dc90af78ae..42e84ebe7fd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -190,16 +189,8 @@ public class TestHoodieRowCreateHandle extends
HoodieClientTestHarness {
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
new HoodieRowCreateHandle(table, cfg, " def",
UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(),
RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
fail("Should have thrown exception");
- } catch (HoodieInsertException ioe) {
- // expected without metadata table
- if (enableMetadataTable) {
- fail("Should have thrown TableNotFoundException");
- }
} catch (TableNotFoundException e) {
- // expected with metadata table
- if (!enableMetadataTable) {
- fail("Should have thrown HoodieInsertException");
- }
+ // expected throw failure
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index c5adafa38e2..3a24ef4dd2f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -36,7 +36,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.expression.Expression;
import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -270,6 +272,26 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
));
}
+ protected List<PartitionPath> listPartitionPaths(List<String>
relativePartitionPaths,
+ Types.RecordType
partitionFields,
+ Expression
partitionColumnPredicates) {
+ List<String> matchedPartitionPaths;
+ try {
+ matchedPartitionPaths =
tableMetadata.getPartitionPathWithPathPrefixUsingFilterExpression(relativePartitionPaths,
+ partitionFields, partitionColumnPredicates);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error fetching partition paths", e);
+ }
+
+ // Convert partition's path into partition descriptor
+ return matchedPartitionPaths.stream()
+ .map(partitionPath -> {
+ Object[] partitionColumnValues =
parsePartitionColumnValues(partitionColumns, partitionPath);
+ return new PartitionPath(partitionPath, partitionColumnValues);
+ })
+ .collect(Collectors.toList());
+ }
+
protected List<PartitionPath> listPartitionPaths(List<String>
relativePartitionPaths) {
List<String> matchedPartitionPaths;
try {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
b/hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java
similarity index 69%
copy from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
copy to hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java
index 3be19330a19..3f7830e996c 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java
@@ -16,22 +16,25 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
-public class AttributeReferenceExpression extends LeafExpression {
+import java.util.List;
- private final String name;
+public class ArrayData implements StructLike {
- public AttributeReferenceExpression(String name) {
- this.name = name;
+ private final List<Object> data;
+
+ public ArrayData(List<Object> data) {
+ this.data = data;
}
- public String getName() {
- return name;
+ @Override
+ public int numFields() {
+ return data.size();
}
@Override
- public <T> T accept(ExpressionVisitor<T> exprVisitor) {
- return exprVisitor.visitAttribute(this);
+ public <T> T get(int pos, Class<T> classTag) {
+ return classTag.cast(data.get(pos));
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java
b/hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java
similarity index 56%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java
rename to
hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java
index d2bc83e9452..a0afdc18928 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java
@@ -16,46 +16,26 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
import java.util.Arrays;
+import java.util.List;
/**
* The expression that accept two child expressions.
*/
-public class BinaryOperator extends Expression {
+public abstract class BinaryExpression implements Expression {
private final Operator operator;
private final Expression left;
private final Expression right;
- private BinaryOperator(Expression left, Operator operator, Expression right)
{
- super(Arrays.asList(left, right));
+ BinaryExpression(Expression left, Operator operator, Expression right) {
this.left = left;
this.operator = operator;
this.right = right;
}
- public static BinaryOperator and(Expression left, Expression right) {
- return new BinaryOperator(left, Operator.AND, right);
- }
-
- public static BinaryOperator or(Expression left, Expression right) {
- return new BinaryOperator(left, Operator.OR, right);
- }
-
- public static BinaryOperator eq(Expression left, Expression right) {
- return new BinaryOperator(left, Operator.EQ, right);
- }
-
- public static BinaryOperator gteq(Expression left, Expression right) {
- return new BinaryOperator(left, Operator.GT_EQ, right);
- }
-
- public static BinaryOperator lteq(Expression left, Expression right) {
- return new BinaryOperator(left, Operator.LT_EQ, right);
- }
-
public Operator getOperator() {
return operator;
}
@@ -69,7 +49,12 @@ public class BinaryOperator extends Expression {
}
@Override
- public <T> T accept(ExpressionVisitor<T> exprVisitor) {
- return exprVisitor.visitBinaryOperator(this);
+ public List<Expression> getChildren() {
+ return Arrays.asList(left, right);
+ }
+
+ @Override
+ public String toString() {
+ return left.toString() + " " + operator.symbol + " " + right.toString();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java
b/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java
new file mode 100644
index 00000000000..2b7e589af21
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BindVisitor implements ExpressionVisitor<Expression> {
+
+ protected final Types.RecordType recordType;
+ protected final boolean caseSensitive;
+
+ public BindVisitor(Types.RecordType recordType, boolean caseSensitive) {
+ this.recordType = recordType;
+ this.caseSensitive = caseSensitive;
+ }
+
+ @Override
+ public Expression alwaysTrue() {
+ return Predicates.TrueExpression.get();
+ }
+
+ @Override
+ public Expression alwaysFalse() {
+ return Predicates.FalseExpression.get();
+ }
+
+ @Override
+ public Expression visitAnd(Predicates.And and) {
+ if (and.getLeft() instanceof Predicates.FalseExpression
+ || and.getRight() instanceof Predicates.FalseExpression) {
+ return alwaysFalse();
+ }
+
+ Expression left = and.getLeft().accept(this);
+ Expression right = and.getRight().accept(this);
+ if (left instanceof Predicates.FalseExpression
+ || right instanceof Predicates.FalseExpression) {
+ return alwaysFalse();
+ }
+
+ if (left instanceof Predicates.TrueExpression
+ && right instanceof Predicates.TrueExpression) {
+ return alwaysTrue();
+ }
+
+ if (left instanceof Predicates.TrueExpression) {
+ return right;
+ }
+
+ if (right instanceof Predicates.TrueExpression) {
+ return left;
+ }
+
+ return Predicates.and(left, right);
+ }
+
+ @Override
+ public Expression visitOr(Predicates.Or or) {
+ if (or.getLeft() instanceof Predicates.TrueExpression
+ || or.getRight() instanceof Predicates.TrueExpression) {
+ return alwaysTrue();
+ }
+
+ Expression left = or.getLeft().accept(this);
+ Expression right = or.getRight().accept(this);
+ if (left instanceof Predicates.TrueExpression
+ || right instanceof Predicates.TrueExpression) {
+ return alwaysTrue();
+ }
+
+ if (left instanceof Predicates.FalseExpression
+ && right instanceof Predicates.FalseExpression) {
+ return alwaysFalse();
+ }
+
+ if (left instanceof Predicates.FalseExpression) {
+ return right;
+ }
+
+ if (right instanceof Predicates.FalseExpression) {
+ return left;
+ }
+
+ return Predicates.or(left, right);
+ }
+
+ @Override
+ public Expression visitLiteral(Literal literal) {
+ return literal;
+ }
+
+ @Override
+ public Expression visitNameReference(NameReference attribute) {
+ Types.Field field = caseSensitive
+ ? recordType.fieldByName(attribute.getName())
+ : recordType.fieldByNameCaseInsensitive(attribute.getName());
+
+ if (field == null) {
+ throw new IllegalArgumentException("The attribute " + attribute
+ + " cannot be bound from schema " + recordType);
+ }
+
+ return new BoundReference(field.fieldId(), field.type());
+ }
+
+ @Override
+ public Expression visitBoundReference(BoundReference boundReference) {
+ return boundReference;
+ }
+
+ @Override
+ public Expression visitPredicate(Predicate predicate) {
+ if (predicate instanceof Predicates.Not) {
+ Expression expr = ((Predicates.Not) predicate).child.accept(this);
+ if (expr instanceof Predicates.TrueExpression) {
+ return alwaysFalse();
+ }
+ if (expr instanceof Predicates.FalseExpression) {
+ return alwaysTrue();
+ }
+
+ return Predicates.not(expr);
+ }
+
+ if (predicate instanceof Predicates.BinaryComparison) {
+ Predicates.BinaryComparison binaryExp = (Predicates.BinaryComparison)
predicate;
+ Expression left = binaryExp.getLeft().accept(this);
+ Expression right = binaryExp.getRight().accept(this);
+ return new Predicates.BinaryComparison(left, binaryExp.getOperator(),
right);
+ }
+
+ if (predicate instanceof Predicates.In) {
+ Predicates.In in = ((Predicates.In) predicate);
+ Expression valueExpression = in.value.accept(this);
+ List<Expression> validValues = in.validValues.stream()
+ .map(validValue -> validValue.accept(this))
+ .collect(Collectors.toList());
+
+ return Predicates.in(valueExpression, validValues);
+ }
+
+ if (predicate instanceof Predicates.IsNull) {
+ Predicates.IsNull isNull = (Predicates.IsNull) predicate;
+ return Predicates.isNull(isNull.child.accept(this));
+ }
+
+ if (predicate instanceof Predicates.IsNotNull) {
+ Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate;
+ return Predicates.isNotNull(isNotNull.child.accept(this));
+ }
+
+ if (predicate instanceof Predicates.StringStartsWith) {
+ Predicates.StringStartsWith contains = (Predicates.StringStartsWith)
predicate;
+ Expression left = contains.getLeft().accept(this);
+ Expression right = contains.getRight().accept(this);
+ return Predicates.startsWith(left, right);
+ }
+
+ if (predicate instanceof Predicates.StringContains) {
+ Predicates.StringContains contains = (Predicates.StringContains)
predicate;
+ Expression left = contains.getLeft().accept(this);
+ Expression right = contains.getRight().accept(this);
+ return Predicates.contains(left, right);
+ }
+
+ throw new IllegalArgumentException("The expression " + this + "cannot be
visited as predicate");
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
b/hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java
similarity index 60%
copy from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
copy to hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java
index f84756a6a85..f3a7e0fac51 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java
@@ -16,28 +16,38 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
-public class Literal extends LeafExpression {
+import org.apache.hudi.internal.schema.Type;
- private final String value;
- private final String type;
+public class BoundReference extends LeafExpression {
- public Literal(String value, String type) {
- this.value = value;
+ private final int ordinal;
+ private final Type type;
+
+ public BoundReference(int ordinal, Type type) {
+ this.ordinal = ordinal;
this.type = type;
}
- public String getValue() {
- return value;
+ @Override
+ public Type getDataType() {
+ return type;
}
- public String getType() {
- return type;
+ @Override
+ public Object eval(StructLike data) {
+ return data.get(ordinal, this.type.typeId().getClassTag());
}
@Override
public <T> T accept(ExpressionVisitor<T> exprVisitor) {
- return exprVisitor.visitLiteral(this);
+ return exprVisitor.visitBoundReference(this);
+ }
+
+ @Override
+ public String toString() {
+ return "boundReference[ordinal: " + ordinal + ", type: "
+ + type.typeId().getName() + "]";
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java
b/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java
new file mode 100644
index 00000000000..14e208197f1
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Comparators {
+
+ private static final Map<Type, Comparator<?>> COMPARATORS =
Collections.unmodifiableMap(
+ new HashMap<Type, Comparator<?>>() {
+ {
+ put(Types.BooleanType.get(), Comparator.naturalOrder());
+ put(Types.IntType.get(), Comparator.naturalOrder());
+ put(Types.LongType.get(), Comparator.naturalOrder());
+ put(Types.FloatType.get(), Comparator.naturalOrder());
+ put(Types.DoubleType.get(), Comparator.naturalOrder());
+ put(Types.DateType.get(), Comparator.naturalOrder());
+ put(Types.TimeType.get(), Comparator.naturalOrder());
+ put(Types.TimestampType.get(), Comparator.naturalOrder());
+ put(Types.StringType.get(), Comparator.naturalOrder());
+ put(Types.UUIDType.get(), Comparator.naturalOrder());
+ }
+ });
+
+ public static <T> Comparator<T> forType(Type.PrimitiveType type) {
+ return (Comparator<T>) Option.ofNullable(COMPARATORS.get(type))
+ .orElseThrow(() -> new UnsupportedOperationException("The desired type
" + type + " doesn't support comparator yet"));
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java
b/hudi-common/src/main/java/org/apache/hudi/expression/Expression.java
similarity index 64%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java
rename to hudi-common/src/main/java/org/apache/hudi/expression/Expression.java
index 8a64237ae1b..5052a35b852 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Expression.java
@@ -16,20 +16,31 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
+import org.apache.hudi.internal.schema.Type;
+
+import java.io.Serializable;
import java.util.List;
-public abstract class Expression {
+public interface Expression extends Serializable {
- public enum Operator {
+ enum Operator {
+ TRUE("TRUE", "TRUE"),
+ FALSE("FALSE", "FALSE"),
AND("AND", "&&"),
OR("OR", "||"),
GT(">", ">"),
LT("<", "<"),
EQ("=", "="),
GT_EQ(">=", ">="),
- LT_EQ("<=", "<=");
+ LT_EQ("<=", "<="),
+ STARTS_WITH(null, null),
+ CONTAINS(null, null),
+ IS_NULL(null, null),
+ IS_NOT_NULL(null, null),
+ IN("IN", "IN"),
+ NOT("NOT", "NOT");
public final String sqlOperator;
public final String symbol;
@@ -40,14 +51,19 @@ public abstract class Expression {
}
}
- private final List<Expression> children;
+ List<Expression> getChildren();
+
+ Type getDataType();
- public Expression(List<Expression> children) {
- this.children = children;
+ default Object eval(StructLike data) {
+ throw new UnsupportedOperationException("Cannot evaluate expression " +
this);
}
/**
* Traverses the expression with the provided {@link ExpressionVisitor}
*/
- public abstract <T> T accept(ExpressionVisitor<T> exprVisitor);
+ <T> T accept(ExpressionVisitor<T> exprVisitor);
+
+ @Override
+ String toString();
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java
b/hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java
similarity index 76%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java
rename to
hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java
index 9f6ea7b1597..10a0362c786 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java
@@ -16,16 +16,26 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
/**
* Visitor used to travers the expression.
*/
public interface ExpressionVisitor<T> {
- T visitBinaryOperator(BinaryOperator binaryOperator);
+ T alwaysTrue();
+
+ T alwaysFalse();
T visitLiteral(Literal literal);
- T visitAttribute(AttributeReferenceExpression attribute);
+ T visitNameReference(NameReference attribute);
+
+ T visitBoundReference(BoundReference boundReference);
+
+ T visitAnd(Predicates.And and);
+
+ T visitOr(Predicates.Or or);
+
+ T visitPredicate(Predicate predicate);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
b/hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java
similarity index 81%
copy from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
copy to hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java
index 318a79116ca..c85f2095a4f 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java
@@ -16,14 +16,17 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
+
+import java.util.List;
/**
* Expression that without any child expressions.
*/
-public abstract class LeafExpression extends Expression {
+public abstract class LeafExpression implements Expression {
- public LeafExpression() {
- super(null);
+ @Override
+ public List<Expression> getChildren() {
+ return null;
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java
b/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java
new file mode 100644
index 00000000000..01fbdb1a1c8
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import javax.xml.bind.DatatypeConverter;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class Literal<T> extends LeafExpression {
+
+ public static <V> Literal from(V value) {
+ if (value instanceof Integer || value instanceof Short) {
+ return new Literal<>(value, Types.IntType.get());
+ }
+
+ if (value instanceof Byte) {
+ return new Literal<>(((Byte)value).intValue(), Types.IntType.get());
+ }
+
+ if (value instanceof Long) {
+ return new Literal<>(value, Types.LongType.get());
+ }
+
+ if (value instanceof Boolean) {
+ return new Literal<>(value, Types.BooleanType.get());
+ }
+
+ if (value instanceof Double) {
+ return new Literal<>(value, Types.DoubleType.get());
+ }
+
+ if (value instanceof Float) {
+ return new Literal<>(value, Types.FloatType.get());
+ }
+
+ if (value instanceof BigDecimal) {
+ BigDecimal decimal = (BigDecimal) value;
+ return new Literal<>(value, Types.DecimalType.get(decimal.precision(),
decimal.scale()));
+ }
+
+ if (value instanceof CharSequence) {
+ return new Literal<>(value, Types.StringType.get());
+ }
+
+ if (value instanceof byte[]) {
+ byte[] bytes = (byte[]) value;
+ return new Literal<>(ByteBuffer.wrap(bytes),
Types.FixedType.getFixed(bytes.length));
+ }
+
+ if (value instanceof ByteBuffer) {
+ return new Literal<>(value, Types.BinaryType.get());
+ }
+
+ if (value instanceof UUID) {
+ return new Literal<>(value, Types.UUIDType.get());
+ }
+
+ throw new IllegalArgumentException("Cannot convert value from class "
+ + value.getClass().getName() + " to Literal");
+ }
+
+ private final T value;
+ private final Type type;
+
+ public Literal(T value, Type type) {
+ this.value = value;
+ this.type = type;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ @Override
+ public Type getDataType() {
+ return type;
+ }
+
+ @Override
+ public Object eval(StructLike data) {
+ return value;
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+ return exprVisitor.visitLiteral(this);
+ }
+
+ @Override
+ public String toString() {
+ if (value == null) {
+ return "null";
+ }
+
+ if (value instanceof ByteBuffer) {
+ return DatatypeConverter.printHexBinary(((ByteBuffer)value).array());
+ }
+
+ return value.toString();
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
b/hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java
similarity index 69%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
rename to
hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java
index 3be19330a19..7d41a61791f 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java
@@ -16,13 +16,15 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
-public class AttributeReferenceExpression extends LeafExpression {
+import org.apache.hudi.internal.schema.Type;
+
+public class NameReference extends LeafExpression {
private final String name;
- public AttributeReferenceExpression(String name) {
+ public NameReference(String name) {
this.name = name;
}
@@ -30,8 +32,18 @@ public class AttributeReferenceExpression extends
LeafExpression {
return name;
}
+ @Override
+ public Type getDataType() {
+ throw new UnsupportedOperationException("NameReference is not bound yet");
+ }
+
@Override
public <T> T accept(ExpressionVisitor<T> exprVisitor) {
- return exprVisitor.visitAttribute(this);
+ return exprVisitor.visitNameReference(this);
+ }
+
+ @Override
+ public String toString() {
+ return "NameReference(name=" + name + ")";
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java
b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java
new file mode 100644
index 00000000000..cece36291df
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Will try to bind all references, and convert unresolved references to
AlwaysTrue.
+ *
+ * e.g. `year=2023 AND day=12`, if year and day both are provided to
`recordType`,
+ * then the expression won't change, if day is not provided, the expression
will be
+ * transformed to `year=2023 AND True`, which will be optimized to `year=2023`.
+ */
+public class PartialBindVisitor extends BindVisitor {
+
+ public PartialBindVisitor(Types.RecordType recordType, boolean
caseSensitive) {
+ super(recordType, caseSensitive);
+ }
+
+ /**
+ * If the attribute cannot find from the schema, directly return null,
visitPredicate
+ * will handle it.
+ */
+ @Override
+ public Expression visitNameReference(NameReference attribute) {
+ Types.Field field = caseSensitive
+ ? recordType.fieldByName(attribute.getName())
+ : recordType.fieldByNameCaseInsensitive(attribute.getName());
+
+ if (field == null) {
+ return null;
+ }
+
+ return new BoundReference(field.fieldId(), field.type());
+ }
+
+ /**
+ * If an expression is null after accept method, which means it cannot be
bounded from
+ * schema, we'll directly return {@link Predicates.TrueExpression}.
+ */
+ @Override
+ public Expression visitPredicate(Predicate predicate) {
+
+ if (predicate instanceof Predicates.BinaryComparison) {
+ Predicates.BinaryComparison binaryExp = (Predicates.BinaryComparison)
predicate;
+ Expression left = binaryExp.getLeft().accept(this);
+ if (left == null) {
+ return alwaysTrue();
+ } else {
+ Expression right = binaryExp.getRight().accept(this);
+ if (right == null) {
+ return alwaysTrue();
+ }
+
+ return new Predicates.BinaryComparison(left, binaryExp.getOperator(),
right);
+ }
+ }
+
+ if (predicate instanceof Predicates.Not) {
+ Expression expr = ((Predicates.Not) predicate).child.accept(this);
+ if (expr instanceof Predicates.TrueExpression) {
+ return alwaysFalse();
+ }
+ if (expr instanceof Predicates.FalseExpression) {
+ return alwaysTrue();
+ }
+
+ return Predicates.not(expr);
+ }
+
+ if (predicate instanceof Predicates.In) {
+ Predicates.In in = ((Predicates.In) predicate);
+ Expression valueExpression = in.value.accept(this);
+ if (valueExpression == null) {
+ return alwaysTrue();
+ }
+ List<Expression> validValues = in.validValues.stream()
+ .map(validValue -> validValue.accept(this))
+ .collect(Collectors.toList());
+ if (validValues.stream().anyMatch(Objects::isNull)) {
+ return alwaysTrue();
+ }
+ return Predicates.in(valueExpression, validValues);
+ }
+
+ if (predicate instanceof Predicates.IsNull) {
+ Predicates.IsNull isNull = (Predicates.IsNull) predicate;
+ return Option.ofNullable(isNull.child.accept(this))
+ .map(expr -> (Expression)Predicates.isNull(expr))
+ .orElse(alwaysTrue());
+ }
+
+ if (predicate instanceof Predicates.IsNotNull) {
+ Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate;
+ return Option.ofNullable(isNotNull.child.accept(this))
+ .map(expr -> (Expression)Predicates.isNotNull(expr))
+ .orElse(alwaysTrue());
+ }
+
+ if (predicate instanceof Predicates.StringStartsWith) {
+ Predicates.StringStartsWith startsWith = (Predicates.StringStartsWith)
predicate;
+ Expression left = startsWith.getLeft().accept(this);
+ if (left == null) {
+ return alwaysTrue();
+ } else {
+ Expression right = startsWith.getRight().accept(this);
+ if (right == null) {
+ return alwaysTrue();
+ }
+
+ return Predicates.startsWith(left, right);
+ }
+ }
+
+ if (predicate instanceof Predicates.StringContains) {
+ Predicates.StringContains contains = (Predicates.StringContains)
predicate;
+ Expression left = contains.getLeft().accept(this);
+ if (left == null) {
+ return alwaysTrue();
+ } else {
+ Expression right = contains.getRight().accept(this);
+ if (right == null) {
+ return alwaysTrue();
+ }
+
+ return Predicates.contains(left, right);
+ }
+ }
+
+ throw new IllegalArgumentException("The expression " + predicate + "
cannot be visited as predicate");
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
b/hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java
similarity index 65%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
rename to hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java
index f84756a6a85..220267870dc 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java
@@ -16,28 +16,25 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
-public class Literal extends LeafExpression {
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
- private final String value;
- private final String type;
-
- public Literal(String value, String type) {
- this.value = value;
- this.type = type;
- }
+/**
+ * An expression that returns a Boolean value.
+ */
+public interface Predicate extends Expression {
- public String getValue() {
- return value;
+ @Override
+ default Type getDataType() {
+ return Types.BooleanType.get();
}
- public String getType() {
- return type;
- }
+ Operator getOperator();
@Override
- public <T> T accept(ExpressionVisitor<T> exprVisitor) {
- return exprVisitor.visitLiteral(this);
+ default <T> T accept(ExpressionVisitor<T> exprVisitor) {
+ return exprVisitor.visitPredicate(this);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java
b/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java
new file mode 100644
index 00000000000..11c4f39507f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Type;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class Predicates {
+
+ public static TrueExpression alwaysTrue() {
+ return TrueExpression.get();
+ }
+
+ public static FalseExpression alwaysFalse() {
+ return FalseExpression.get();
+ }
+
+ public static And and(Expression left, Expression right) {
+ return new And(left, right);
+ }
+
+ public static Or or(Expression left, Expression right) {
+ return new Or(left, right);
+ }
+
+ public static BinaryComparison gt(Expression left, Expression right) {
+ return new BinaryComparison(left, Expression.Operator.GT, right);
+ }
+
+ public static BinaryComparison lt(Expression left, Expression right) {
+ return new BinaryComparison(left, Expression.Operator.LT, right);
+ }
+
+ public static BinaryComparison eq(Expression left, Expression right) {
+ return new BinaryComparison(left, Expression.Operator.EQ, right);
+ }
+
+ public static BinaryComparison gteq(Expression left, Expression right) {
+ return new BinaryComparison(left, Expression.Operator.GT_EQ, right);
+ }
+
+ public static BinaryComparison lteq(Expression left, Expression right) {
+ return new BinaryComparison(left, Expression.Operator.LT_EQ, right);
+ }
+
+ public static StringStartsWith startsWith(Expression left, Expression right)
{
+ return new StringStartsWith(left, right);
+ }
+
+ public static StringContains contains(Expression left, Expression right) {
+ return new StringContains(left, right);
+ }
+
+ public static In in(Expression left, List<Expression> validExpressions) {
+ return new In(left, validExpressions);
+ }
+
+ public static IsNull isNull(Expression child) {
+ return new IsNull(child);
+ }
+
+ public static IsNotNull isNotNull(Expression child) {
+ return new IsNotNull(child);
+ }
+
+ public static Not not(Expression expr) {
+ return new Not(expr);
+ }
+
+ public static class TrueExpression extends LeafExpression implements
Predicate {
+
+ private static final TrueExpression INSTANCE = new TrueExpression();
+
+ public static TrueExpression get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ return true;
+ }
+
+ @Override
+ public Operator getOperator() {
+ return Operator.TRUE;
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+ return exprVisitor.alwaysTrue();
+ }
+
+ @Override
+ public String toString() {
+ return "TRUE";
+ }
+ }
+
+ public static class FalseExpression extends LeafExpression implements
Predicate {
+
+ private static final FalseExpression INSTANCE = new FalseExpression();
+
+ public static FalseExpression get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ return false;
+ }
+
+ @Override
+ public Operator getOperator() {
+ return Operator.FALSE;
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+ return exprVisitor.alwaysFalse();
+ }
+
+ @Override
+ public String toString() {
+ return "FALSE";
+ }
+ }
+
+ public static class And extends BinaryExpression implements Predicate {
+
+ public And(Expression left, Expression right) {
+ super(left, Operator.AND, right);
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ if (getLeft() instanceof FalseExpression || getRight() instanceof
FalseExpression) {
+ return false;
+ }
+ Object left = getLeft().eval(data);
+ if (left != null && !(Boolean) left) {
+ return false;
+ } else {
+ Object right = getRight().eval(data);
+ if (right != null && !(Boolean) right) {
+ return false;
+ } else {
+ if (left != null && right != null) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+ return exprVisitor.visitAnd(this);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + getLeft() + " " + getOperator().symbol + " " + getRight() +
")";
+ }
+ }
+
+ public static class Or extends BinaryExpression implements Predicate {
+
+ public Or(Expression left, Expression right) {
+ super(left, Operator.OR, right);
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ if (getLeft() instanceof TrueExpression || getRight() instanceof
TrueExpression) {
+ return true;
+ }
+
+ Object left = getLeft().eval(data);
+
+ if (left == null) {
+ return false;
+ }
+
+ if ((Boolean) left) {
+ return true;
+ } else {
+ Object right = getRight().eval(data);
+ return right != null && (Boolean) right;
+ }
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+ return exprVisitor.visitOr(this);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + getLeft() + " " + getOperator().symbol + " " + getRight() +
")";
+ }
+ }
+
+ public static class StringStartsWith extends BinaryExpression implements
Predicate {
+
+ StringStartsWith(Expression left, Expression right) {
+ super(left, Operator.STARTS_WITH, right);
+ }
+
+ @Override
+ public String toString() {
+ return getLeft().toString() + ".startWith(" + getRight().toString() +
")";
+ }
+
+ @Override
+ public Object eval(StructLike data) {
+ return
getLeft().eval(data).toString().startsWith(getRight().eval(data).toString());
+ }
+ }
+
+ public static class StringContains extends BinaryExpression implements
Predicate {
+
+ StringContains(Expression left, Expression right) {
+ super(left, Operator.CONTAINS, right);
+ }
+
+ @Override
+ public String toString() {
+ return getLeft().toString() + ".contains(" + getRight().toString() + ")";
+ }
+
+ @Override
+ public Object eval(StructLike data) {
+ return
getLeft().eval(data).toString().contains(getRight().eval(data).toString());
+ }
+ }
+
+ public static class In implements Predicate {
+
+ protected final Expression value;
+ protected final List<Expression> validValues;
+
+ public In(Expression value, List<Expression> validValues) {
+ this.value = value;
+ this.validValues = validValues;
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ ArrayList<Expression> children = new ArrayList<>(validValues.size() + 1);
+ children.add(value);
+ children.addAll(validValues);
+ return children;
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ Set<Object> values = validValues.stream()
+ .map(validValue -> validValue.eval(data))
+ .collect(Collectors.toSet());
+ return values.contains(value.eval(data));
+ }
+
+ @Override
+ public Operator getOperator() {
+ return Operator.IN;
+ }
+
+ @Override
+ public String toString() {
+ return value.toString() + " " + getOperator().symbol + " "
+ +
validValues.stream().map(Expression::toString).collect(Collectors.joining(",",
"(", ")"));
+ }
+ }
+
+ public static class IsNull implements Predicate {
+
+ protected final Expression child;
+
+ public IsNull(Expression child) {
+ this.child = child;
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.singletonList(child);
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ return child.eval(data) == null;
+ }
+
+ @Override
+ public Operator getOperator() {
+ return Operator.IS_NULL;
+ }
+
+ @Override
+ public String toString() {
+ return child.toString() + " IS NULL";
+ }
+ }
+
+ public static class IsNotNull implements Predicate {
+
+ protected final Expression child;
+
+ public IsNotNull(Expression child) {
+ this.child = child;
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.singletonList(child);
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ return child.eval(data) != null;
+ }
+
+ @Override
+ public Operator getOperator() {
+ return Operator.IS_NOT_NULL;
+ }
+
+ @Override
+ public String toString() {
+ return child.toString() + " IS NOT NULL";
+ }
+ }
+
+ public static class Not implements Predicate {
+
+ Expression child;
+
+ public Not(Expression child) {
+ this.child = child;
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.singletonList(child);
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ return ! (Boolean) child.eval(data);
+ }
+
+ @Override
+ public Operator getOperator() {
+ return Operator.NOT;
+ }
+
+ @Override
+ public String toString() {
+ return "NOT " + child;
+ }
+ }
+
+ public static class BinaryComparison extends BinaryExpression implements
Predicate {
+
+ public BinaryComparison(Expression left, Operator operator, Expression
right) {
+ super(left, operator, right);
+ }
+
+ @Override
+ public Boolean eval(StructLike data) {
+ if (getLeft().getDataType().isNestedType()) {
+ throw new IllegalArgumentException("The nested type doesn't support
binary comparison");
+ }
+ Comparator<Object> comparator = Comparators.forType((Type.PrimitiveType)
getLeft().getDataType());
+ switch (getOperator()) {
+ case EQ:
+ return comparator.compare(getLeft().eval(data),
getRight().eval(data)) == 0;
+ case GT:
+ return comparator.compare(getLeft().eval(data),
getRight().eval(data)) > 0;
+ case GT_EQ:
+ return comparator.compare(getLeft().eval(data),
getRight().eval(data)) >= 0;
+ case LT:
+ return comparator.compare(getLeft().eval(data),
getRight().eval(data)) < 0;
+ case LT_EQ:
+ return comparator.compare(getLeft().eval(data),
getRight().eval(data)) <= 0;
+ default:
+ throw new IllegalArgumentException("The operation " + getOperator()
+ " doesn't support binary comparison");
+ }
+ }
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
b/hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java
similarity index 79%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
rename to hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java
index 318a79116ca..d6346b8c657 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java
@@ -16,14 +16,11 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
-/**
- * Expression that without any child expressions.
- */
-public abstract class LeafExpression extends Expression {
+public interface StructLike {
+
+ int numFields();
- public LeafExpression() {
- super(null);
- }
+ <T> T get(int pos, Class<T> classTag);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
index d51ab37719a..bc8b89004d6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
@@ -18,10 +18,22 @@
package org.apache.hudi.internal.schema;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+
import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
/**
* The type of a schema, reference avro schema.
@@ -29,21 +41,45 @@ import java.util.Objects;
* to do add support for localTime if avro version is updated
*/
public interface Type extends Serializable {
+
+ OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
/**
* Enums for type names.
*/
enum TypeID {
- RECORD, ARRAY, MAP, FIXED, STRING, BINARY,
- INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID;
- private String name;
-
- TypeID() {
+ RECORD(Types.RecordType.class),
+ ARRAY(List.class),
+ MAP(Map.class),
+ FIXED(ByteBuffer.class),
+ STRING(String.class),
+ BINARY(ByteBuffer.class),
+ INT(Integer.class),
+ LONG(Long.class),
+ FLOAT(Float.class),
+ DOUBLE(Double.class),
+ DATE(Integer.class),
+ BOOLEAN(Boolean.class),
+ TIME(Long.class),
+ TIMESTAMP(Long.class),
+ DECIMAL(BigDecimal.class),
+ UUID(UUID.class);
+ private final String name;
+ private final Class<?> classTag;
+
+ TypeID(Class<?> classTag) {
this.name = this.name().toLowerCase(Locale.ROOT);
+ this.classTag = classTag;
}
public String getName() {
return name;
}
+
+ public Class<?> getClassTag() {
+ return classTag;
+ }
}
static TypeID fromValue(String value) {
@@ -54,6 +90,40 @@ public interface Type extends Serializable {
}
}
+ static Object fromPartitionString(String partitionValue, Type type) {
+ if (partitionValue == null
+ ||
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH.equals(partitionValue)
+ ||
PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH.equals(partitionValue))
{
+ return null;
+ }
+
+ switch (type.typeId()) {
+ case INT:
+ return Integer.parseInt(partitionValue);
+ case LONG:
+ return Long.parseLong(partitionValue);
+ case BOOLEAN:
+ return Boolean.parseBoolean(partitionValue);
+ case FLOAT:
+ return Float.parseFloat(partitionValue);
+ case DECIMAL:
+ return new BigDecimal(partitionValue);
+ case DOUBLE:
+ return Double.parseDouble(partitionValue);
+ case UUID:
+ return UUID.fromString(partitionValue);
+ case DATE:
+ // TODO Support different date format
+ return Math.toIntExact(ChronoUnit.DAYS.between(
+ EPOCH_DAY, LocalDate.parse(partitionValue,
DateTimeFormatter.ISO_LOCAL_DATE)));
+ case STRING:
+ return partitionValue;
+ default:
+ throw new UnsupportedOperationException("Cast value " + partitionValue
+ + " to type " + type + " is not supported yet");
+ }
+ }
+
TypeID typeId();
default boolean isNestedType() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
index 3d2774bc19b..ed03a7349cb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
@@ -23,7 +23,6 @@ import org.apache.hudi.internal.schema.Type.PrimitiveType;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -511,6 +510,7 @@ public class Types {
private final Field[] fields;
private transient Map<String, Field> nameToFields = null;
+ private transient Map<String, Field> lowercaseNameToFields = null;
private transient Map<Integer, Field> idToFields = null;
private RecordType(List<Field> fields, String name) {
@@ -523,30 +523,43 @@ public class Types {
return Arrays.asList(fields);
}
- public Field field(String name) {
+ /**
+ * Case-sensitive get field by name
+ */
+ public Field fieldByName(String name) {
if (nameToFields == null) {
- nameToFields = new HashMap<>();
- for (Field field : fields) {
- nameToFields.put(field.name().toLowerCase(Locale.ROOT), field);
- }
+ nameToFields = Arrays.stream(fields)
+ .collect(Collectors.toMap(
+ Field::name,
+ field -> field));
+ }
+ return nameToFields.get(name);
+ }
+
+ public Field fieldByNameCaseInsensitive(String name) {
+ if (lowercaseNameToFields == null) {
+ lowercaseNameToFields = Arrays.stream(fields)
+ .collect(Collectors.toMap(
+ field -> field.name.toLowerCase(Locale.ROOT),
+ field -> field));
}
- return nameToFields.get(name.toLowerCase(Locale.ROOT));
+ return lowercaseNameToFields.get(name.toLowerCase(Locale.ROOT));
}
@Override
public Field field(int id) {
if (idToFields == null) {
- idToFields = new HashMap<>();
- for (Field field : fields) {
- idToFields.put(field.fieldId(), field);
- }
+ idToFields = Arrays.stream(fields)
+ .collect(Collectors.toMap(
+ Field::fieldId,
+ field -> field));
}
return idToFields.get(id);
}
@Override
public Type fieldType(String name) {
- Field field = field(name);
+ Field field = fieldByNameCaseInsensitive(name);
if (field != null) {
return field.type();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java
new file mode 100644
index 00000000000..67adff88ac7
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class Conversions {
+
+ private static final HashSet<Type.TypeID> SUPPORTED_PARTITION_TYPES = new
HashSet<>(
+ Arrays.asList(Type.TypeID.INT,
+ Type.TypeID.LONG,
+ Type.TypeID.BOOLEAN,
+ Type.TypeID.FLOAT,
+ Type.TypeID.DECIMAL,
+ Type.TypeID.DOUBLE,
+ Type.TypeID.UUID,
+ Type.TypeID.DATE,
+ Type.TypeID.STRING));
+
+ public static boolean isPartitionSchemaSupportedConversion(Types.RecordType
schema) {
+ for (Types.Field field: schema.fields()) {
+ if (!SUPPORTED_PARTITION_TYPES.contains(field.type().typeId())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
new file mode 100644
index 00000000000..f62786e9517
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.expression.ArrayData;
+import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.hadoop.SerializablePath;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public abstract class AbstractHoodieTableMetadata implements
HoodieTableMetadata {
+
+ protected transient HoodieEngineContext engineContext;
+
+ protected final SerializableConfiguration hadoopConf;
+ protected final SerializablePath dataBasePath;
+
+ // TODO get this from HoodieConfig
+ protected final boolean caseSensitive = false;
+
+ public AbstractHoodieTableMetadata(HoodieEngineContext engineContext,
SerializableConfiguration conf, String dataBasePath) {
+ this.engineContext = engineContext;
+ this.hadoopConf = conf;
+ this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
+ }
+
+ protected static int getPathPartitionLevel(Types.RecordType partitionFields,
String path) {
+ if (StringUtils.isNullOrEmpty(path) || partitionFields == null) {
+ return 0;
+ }
+
+ int level = 1;
+ for (int i = 1; i < path.length() - 1; i++) {
+ if (path.charAt(i) == Path.SEPARATOR_CHAR) {
+ level++;
+ }
+ }
+ if (path.startsWith(Path.SEPARATOR)) {
+ level--;
+ }
+ if (path.endsWith(Path.SEPARATOR)) {
+ level--;
+ }
+ return level;
+ }
+
+ protected static ArrayData extractPartitionValues(Types.RecordType
partitionFields,
+ String
relativePartitionPath,
+ boolean
urlEncodePartitioningEnabled) {
+ if (partitionFields.fields().size() == 1) {
+ // SinglePartPartitionValue, which might contain slashes.
+ String partitionValue = relativePartitionPath.split("=")[1];
+ return new ArrayData(Collections.singletonList(Type.fromPartitionString(
+ urlEncodePartitioningEnabled ?
PartitionPathEncodeUtils.unescapePathName(partitionValue) : partitionValue,
+ partitionFields.field(0).type())));
+ }
+
+ List<Object> partitionValues;
+ String[] partitionFragments = relativePartitionPath.split("/");
+ partitionValues = IntStream.range(0, partitionFragments.length)
+ .mapToObj(idx -> {
+ String partitionValue = partitionFragments[idx].split("=")[1];
+ return Type.fromPartitionString(
+ urlEncodePartitioningEnabled ?
PartitionPathEncodeUtils.unescapePathName(partitionValue) : partitionValue,
+ partitionFields.field(idx).type());
+ }).collect(Collectors.toList());
+
+ return new ArrayData(partitionValues);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 076eb4bf1dc..7e1acf3a87c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -41,8 +41,6 @@ import org.apache.hudi.common.util.hash.FileIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.hadoop.CachingPath;
-import org.apache.hudi.hadoop.SerializablePath;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -68,25 +66,33 @@ import java.util.stream.Collectors;
/**
* Abstract class for implementing common table metadata operations.
*/
-public abstract class BaseTableMetadata implements HoodieTableMetadata {
+public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata {
private static final Logger LOG =
LoggerFactory.getLogger(BaseTableMetadata.class);
- protected transient HoodieEngineContext engineContext;
- protected final SerializablePath dataBasePath;
+ protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+ // NOTE: Buffer-size is deliberately set pretty low, since MT internally is
relying
+ // on HFile (serving as persisted binary key-value mapping) to do
caching
+ protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
+
protected final HoodieTableMetaClient dataMetaClient;
protected final Option<HoodieMetadataMetrics> metrics;
protected final HoodieMetadataConfig metadataConfig;
protected boolean isMetadataTableInitialized;
+ protected final boolean hiveStylePartitioningEnabled;
+ protected final boolean urlEncodePartitioningEnabled;
protected BaseTableMetadata(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig, String dataBasePath) {
- this.engineContext = engineContext;
- this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
+ super(engineContext, engineContext.getHadoopConf(), dataBasePath);
+
this.dataMetaClient = HoodieTableMetaClient.builder()
- .setConf(engineContext.getHadoopConf().get())
+ .setConf(hadoopConf.get())
.setBasePath(dataBasePath)
.build();
+
+ this.hiveStylePartitioningEnabled =
Boolean.parseBoolean(dataMetaClient.getTableConfig().getHiveStylePartitioningEnable());
+ this.urlEncodePartitioningEnabled =
Boolean.parseBoolean(dataMetaClient.getTableConfig().getUrlEncodePartitioning());
this.metadataConfig = metadataConfig;
this.isMetadataTableInitialized =
dataMetaClient.getTableConfig().isMetadataTableAvailable();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 9ce83a7d953..10ed196714d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
@@ -33,6 +34,12 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.expression.BindVisitor;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.expression.PartialBindVisitor;
+import org.apache.hudi.expression.Predicates;
+import org.apache.hudi.internal.schema.Types;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -52,20 +59,36 @@ import java.util.stream.Stream;
/**
* Implementation of {@link HoodieTableMetadata} based file-system-backed
table metadata.
*/
-public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
+public class FileSystemBackedTableMetadata extends AbstractHoodieTableMetadata
{
private static final int DEFAULT_LISTING_PARALLELISM = 1500;
- private final transient HoodieEngineContext engineContext;
- private final SerializableConfiguration hadoopConf;
- private final String datasetBasePath;
private final boolean assumeDatePartitioning;
- public FileSystemBackedTableMetadata(HoodieEngineContext engineContext,
SerializableConfiguration conf, String datasetBasePath,
+ private final boolean hiveStylePartitioningEnabled;
+ private final boolean urlEncodePartitioningEnabled;
+
+ public FileSystemBackedTableMetadata(HoodieEngineContext engineContext,
HoodieTableConfig tableConfig,
+ SerializableConfiguration conf, String
datasetBasePath,
boolean assumeDatePartitioning) {
- this.engineContext = engineContext;
- this.hadoopConf = conf;
- this.datasetBasePath = datasetBasePath;
+ super(engineContext, conf, datasetBasePath);
+
+ this.hiveStylePartitioningEnabled =
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+ this.urlEncodePartitioningEnabled =
Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning());
+ this.assumeDatePartitioning = assumeDatePartitioning;
+ }
+
+ public FileSystemBackedTableMetadata(HoodieEngineContext engineContext,
+ SerializableConfiguration conf, String
datasetBasePath,
+ boolean assumeDatePartitioning) {
+ super(engineContext, conf, datasetBasePath);
+
+ FileSystem fs = FSUtils.getFs(dataBasePath.get(), conf.get());
+ Path metaPath = new Path(dataBasePath.get(),
HoodieTableMetaClient.METAFOLDER_NAME);
+ TableNotFoundException.checkTableValidity(fs, this.dataBasePath.get(),
metaPath);
+ HoodieTableConfig tableConfig = new HoodieTableConfig(fs,
metaPath.toString(), null, null);
+ this.hiveStylePartitioningEnabled =
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+ this.urlEncodePartitioningEnabled =
Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning());
this.assumeDatePartitioning = assumeDatePartitioning;
}
@@ -77,15 +100,29 @@ public class FileSystemBackedTableMetadata implements
HoodieTableMetadata {
@Override
public List<String> getAllPartitionPaths() throws IOException {
- Path basePath = new Path(datasetBasePath);
+ Path basePath = dataBasePath.get();
if (assumeDatePartitioning) {
FileSystem fs = basePath.getFileSystem(hadoopConf.get());
- return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs,
datasetBasePath);
+ return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs,
dataBasePath.toString());
}
return getPartitionPathWithPathPrefixes(Collections.singletonList(""));
}
+ @Override
+ public List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(List<String>
relativePathPrefixes,
+
Types.RecordType partitionFields,
+
Expression expression) throws IOException {
+ return relativePathPrefixes.stream().flatMap(relativePathPrefix -> {
+ try {
+ return
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix,
+ partitionFields, expression).stream();
+ } catch (IOException e) {
+ throw new HoodieIOException("Error fetching partition paths with
relative path: " + relativePathPrefix, e);
+ }
+ }).collect(Collectors.toList());
+ }
+
@Override
public List<String> getPartitionPathWithPathPrefixes(List<String>
relativePathPrefixes) {
return relativePathPrefixes.stream().flatMap(relativePathPrefix -> {
@@ -98,11 +135,35 @@ public class FileSystemBackedTableMetadata implements
HoodieTableMetadata {
}
private List<String> getPartitionPathWithPathPrefix(String
relativePathPrefix) throws IOException {
+ return
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix, null,
null);
+ }
+
+ private List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(String relativePathPrefix,
+
Types.RecordType partitionFields,
+
Expression pushedExpr) throws IOException {
List<Path> pathsToList = new CopyOnWriteArrayList<>();
pathsToList.add(StringUtils.isNullOrEmpty(relativePathPrefix)
- ? new Path(datasetBasePath) : new Path(datasetBasePath,
relativePathPrefix));
+ ? dataBasePath.get() : new Path(dataBasePath.get(),
relativePathPrefix));
List<String> partitionPaths = new CopyOnWriteArrayList<>();
+ int currentPartitionLevel = -1;
+ boolean needPushDownExpressions;
+ Expression fullBoundExpr;
+ // Not like `HoodieBackedTableMetadata`, since we don't know the exact
partition levels here,
+ // given it's possible that partition values contains `/`, which could
affect
+ // the result to get right `partitionValue` when listing paths, here we
have
+ // to make it more strict that `urlEncodePartitioningEnabled` must be
enabled.
+ // TODO better enable urlEncodePartitioningEnabled if
hiveStylePartitioningEnabled is enabled?
+ if (hiveStylePartitioningEnabled && urlEncodePartitioningEnabled
+ && pushedExpr != null && partitionFields != null) {
+ currentPartitionLevel = getPathPartitionLevel(partitionFields,
relativePathPrefix);
+ needPushDownExpressions = true;
+ fullBoundExpr = pushedExpr.accept(new BindVisitor(partitionFields,
caseSensitive));
+ } else {
+ fullBoundExpr = Predicates.alwaysTrue();
+ needPushDownExpressions = false;
+ }
+
while (!pathsToList.isEmpty()) {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
pathsToList.size());
@@ -116,7 +177,7 @@ public class FileSystemBackedTableMetadata implements
HoodieTableMetadata {
List<Pair<Option<String>, Option<Path>>> result =
engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
- return
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), path)), Option.empty()));
+ return
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
path)), Option.empty()));
}
return Arrays.stream(fileSystem.listStatus(path, p -> {
try {
@@ -129,10 +190,34 @@ public class FileSystemBackedTableMetadata implements
HoodieTableMetadata {
}, listingParallelism);
pathsToList.clear();
- partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
+ partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent())
+ .map(entry -> entry.getKey().get())
+ .filter(relativePartitionPath -> fullBoundExpr instanceof
Predicates.TrueExpression
+ || (Boolean) fullBoundExpr.eval(
+ extractPartitionValues(partitionFields,
relativePartitionPath, urlEncodePartitioningEnabled)))
.collect(Collectors.toList()));
+ Expression partialBoundExpr;
+ // If partitionPaths is nonEmpty, we're already at the last path level,
and all paths
+ // are filtered already.
+ if (needPushDownExpressions && partitionPaths.isEmpty()) {
+ // Here we assume the path level matches the number of partition
columns, so we'll rebuild
+ // new schema based on current path level.
+ // e.g. partition columns are <region, date, hh>, if we're listing the
second level, then
+ // currentSchema would be <region, date>
+ // `PartialBindVisitor` will bind reference if it can be found from
`currentSchema`, otherwise
+ // will change the expression to `alwaysTrue`. Can see
`PartialBindVisitor` for details.
+ Types.RecordType currentSchema =
Types.RecordType.get(partitionFields.fields().subList(0,
++currentPartitionLevel));
+ PartialBindVisitor partialBindVisitor = new
PartialBindVisitor(currentSchema, caseSensitive);
+ partialBoundExpr = pushedExpr.accept(partialBindVisitor);
+ } else {
+ partialBoundExpr = Predicates.alwaysTrue();
+ }
+
pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+ .filter(path -> partialBoundExpr instanceof
Predicates.TrueExpression
+ || (Boolean) partialBoundExpr.eval(
+ extractPartitionValues(partitionFields,
FSUtils.getRelativePartitionPath(dataBasePath.get(), path),
urlEncodePartitioningEnabled)))
.collect(Collectors.toList()));
}
return partitionPaths;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index b0e9bcaaab2..9a27d2cfbca 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -46,6 +46,9 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.expression.BindVisitor;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieSeekingFileReader;
import org.apache.hudi.util.Transient;
@@ -143,6 +146,26 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return Option.ofNullable(recordsByKeys.get(key));
}
+ @Override
+ public List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(List<String>
relativePathPrefixes,
+
Types.RecordType partitionFields,
+
Expression expression) throws IOException {
+ Expression boundedExpr = expression.accept(new
BindVisitor(partitionFields, caseSensitive));
+ List<String> selectedPartitionPaths =
getPartitionPathWithPathPrefixes(relativePathPrefixes);
+
+ // Can only prune partitions if the number of partition levels matches
partition fields
+ // Here we'll check the first selected partition to see whether the
numbers match.
+ if (hiveStylePartitioningEnabled
+ && getPathPartitionLevel(partitionFields,
selectedPartitionPaths.get(0)) == partitionFields.fields().size()) {
+ return selectedPartitionPaths.stream()
+ .filter(p ->
+ (boolean)
boundedExpr.eval(extractPartitionValues(partitionFields, p,
urlEncodePartitioningEnabled)))
+ .collect(Collectors.toList());
+ }
+
+ return selectedPartitionPaths;
+ }
+
@Override
public List<String> getPartitionPathWithPathPrefixes(List<String>
relativePathPrefixes) throws IOException {
// TODO: consider skipping this method for non-partitioned table and
simplify the checks
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index ac17eed001b..0ba197a5c68 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -33,6 +33,8 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.internal.schema.Types;
import java.io.IOException;
import java.io.Serializable;
@@ -146,6 +148,14 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
*/
FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException;
+ /**
+ * Retrieve the paths of partitions under the provided sub-directories,
+ * and try to filter these partitions using the provided {@link Expression}.
+ */
+ List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(List<String>
relativePathPrefixes,
+
Types.RecordType partitionFields,
+ Expression
expression) throws IOException;
+
/**
* Fetches all partition paths that are the sub-directories of the list of
provided (relative) paths.
* <p>
diff --git
a/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java
b/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java
new file mode 100644
index 00000000000..c7e75711823
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestPartialBindVisitor {
+
+ private static Types.RecordType schema;
+ @BeforeAll
+ public static void init() {
+ ArrayList<Types.Field> fields = new ArrayList<>(5);
+ fields.add(Types.Field.get(0, true, "a", Types.StringType.get()));
+ fields.add(Types.Field.get(1, true, "b", Types.DateType.get()));
+ fields.add(Types.Field.get(2, true, "c", Types.IntType.get()));
+ fields.add(Types.Field.get(3, true, "d", Types.LongType.get()));
+ fields.add(Types.Field.get(4, false, "f", Types.BooleanType.get()));
+ schema = Types.RecordType.get(fields, "schema");
+ }
+
+ @Test
+ public void testPartialBindIfAllExisting() {
+ PartialBindVisitor partialBindVisitor = new PartialBindVisitor(schema,
false);
+
+ Predicates.BinaryComparison eq = Predicates.eq(new NameReference("a"),
+ Literal.from("Jane"));
+ Predicates.BinaryComparison gt = Predicates.gt(new NameReference("c"),
+ Literal.from(10));
+ Predicates.In in = Predicates.in(new NameReference("d"),
+ Arrays.asList(Literal.from(10L), Literal.from(13L)));
+
+ Predicates.And expr = Predicates.and(eq, Predicates.or(gt, in));
+ Expression binded = expr.accept(partialBindVisitor);
+
+ Assertions.assertTrue((Boolean) binded.eval(new
ArrayData(Arrays.asList("Jane", "2023-04-02", 15, 5L, false))));
+ Assertions.assertTrue((Boolean) binded.eval(new
ArrayData(Arrays.asList("Jane", "2023-04-02", 5, 10L, false))));
+ Assertions.assertFalse((Boolean) binded.eval(new
ArrayData(Arrays.asList("Lone", "2023-04-02", 15, 5L, false))));
+ Assertions.assertFalse((Boolean) binded.eval(new
ArrayData(Arrays.asList("Lone", "2023-04-02", 10, 5L, false))));
+ }
+
+ @Test
+ public void testPartialBindIfFieldMissing() {
+ PartialBindVisitor partialBindVisitor = new PartialBindVisitor(schema,
false);
+
+ Predicates.BinaryComparison eq = Predicates.eq(new NameReference("a"),
+ Literal.from("Jane"));
+ Predicates.BinaryComparison lt = Predicates.lt(new NameReference("m"),
+ Literal.from(10));
+ Predicates.BinaryComparison gteq = Predicates.gteq(new NameReference("d"),
+ Literal.from(10L));
+
+ Predicates.And expr = Predicates.and(eq, Predicates.or(lt, gteq));
+ // Since Attribute m does not exist in the schema, so the OR expression is
always true,
+ // the expression is optimized to only consider the EQ expression
+ Expression binded = expr.accept(partialBindVisitor);
+
+ Assertions.assertTrue((Boolean) binded.eval(new
ArrayData(Arrays.asList("Jane", "2023-04-02", 15, 5L, false))));
+ Assertions.assertFalse((Boolean) binded.eval(new
ArrayData(Arrays.asList("Lone", "2023-04-02", 15, 5L, false))));
+ Assertions.assertFalse((Boolean) binded.eval(new
ArrayData(Arrays.asList("Lone", "2023-04-02", 10, 5L, false))));
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index 338e892f48c..6c5fcb7049c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -331,7 +331,7 @@ public class TestAvroSchemaEvolutionUtils {
avroRecord.put("preferences", preferencesRecord);
// fill mapType
Map<String, GenericData.Record> locations = new HashMap<>();
- Schema mapSchema =
AvroInternalSchemaConverter.convert(((Types.MapType)record.field("locations").type()).valueType(),
"test1.locations");
+ Schema mapSchema =
AvroInternalSchemaConverter.convert(((Types.MapType)record.fieldByNameCaseInsensitive("locations").type()).valueType(),
"test1.locations");
GenericData.Record locationsValue = new GenericData.Record(mapSchema);
locationsValue.put("lat", 1.2f);
locationsValue.put("long", 1.4f);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
index db7608cea96..799ff7e7d23 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
@@ -71,7 +71,7 @@ public class TestFileSystemBackedTableMetadata extends
HoodieCommonTestHarness {
hoodieTestTable.addCommit("100").withBaseFilesInPartition(DEFAULT_PARTITION,
IntStream.range(0, 10).toArray());
HoodieLocalEngineContext localEngineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
- new FileSystemBackedTableMetadata(localEngineContext, new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+ new FileSystemBackedTableMetadata(localEngineContext,
metaClient.getTableConfig(), new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertEquals(0,
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10,
fileSystemBackedTableMetadata.getAllFilesInPartition(new
Path(basePath)).length);
Assertions.assertEquals(10,
fileSystemBackedTableMetadata.getAllFilesInPartitions(
@@ -96,7 +96,7 @@ public class TestFileSystemBackedTableMetadata extends
HoodieCommonTestHarness {
});
HoodieLocalEngineContext localEngineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
- new FileSystemBackedTableMetadata(localEngineContext, new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, true);
+ new FileSystemBackedTableMetadata(localEngineContext,
metaClient.getTableConfig(), new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, true);
Assertions.assertEquals(3,
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10,
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" +
DATE_PARTITIONS.get(0))).length);
@@ -127,7 +127,7 @@ public class TestFileSystemBackedTableMetadata extends
HoodieCommonTestHarness {
});
HoodieLocalEngineContext localEngineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
- new FileSystemBackedTableMetadata(localEngineContext, new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+ new FileSystemBackedTableMetadata(localEngineContext,
metaClient.getTableConfig(), new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertEquals(3,
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p ->
basePath + "/" + p).collect(Collectors.toList());
@@ -152,7 +152,7 @@ public class TestFileSystemBackedTableMetadata extends
HoodieCommonTestHarness {
});
HoodieLocalEngineContext localEngineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
- new FileSystemBackedTableMetadata(localEngineContext, new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+ new FileSystemBackedTableMetadata(localEngineContext,
metaClient.getTableConfig(), new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertEquals(3,
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10,
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" +
ONE_LEVEL_PARTITIONS.get(0))).length);
@@ -178,7 +178,7 @@ public class TestFileSystemBackedTableMetadata extends
HoodieCommonTestHarness {
});
HoodieLocalEngineContext localEngineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
- new FileSystemBackedTableMetadata(localEngineContext, new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+ new FileSystemBackedTableMetadata(localEngineContext,
metaClient.getTableConfig(), new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertEquals(3,
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10,
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" +
MULTI_LEVEL_PARTITIONS.get(0))).length);
@@ -203,7 +203,7 @@ public class TestFileSystemBackedTableMetadata extends
HoodieCommonTestHarness {
});
HoodieLocalEngineContext localEngineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
- new FileSystemBackedTableMetadata(localEngineContext, new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+ new FileSystemBackedTableMetadata(localEngineContext,
metaClient.getTableConfig(), new
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertEquals(3,
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(0,
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" +
MULTI_LEVEL_PARTITIONS.get(0))).length);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala
new file mode 100644
index 00000000000..5a9bc29089e
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.expression.{Predicates, Expression, Literal,
NameReference}
+import org.apache.hudi.internal.schema.{Type, Types}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate}
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
+
+object SparkFilterHelper {
+
+ def convertFilters(filters: Seq[Filter]): Expression = {
+ filters.flatMap(convertFilter)
+ .reduceLeftOption(Predicates.and)
+ .getOrElse(Predicates.alwaysTrue())
+ }
+
+ def convertFilter(filter: Filter): Option[Expression] = filter match {
+ case EqualTo(attribute, value) =>
+ Some(Predicates.eq(new NameReference(attribute), toLiteral(value)))
+ case EqualNullSafe(attribute, value) =>
+ Some(Predicates.eq(new NameReference(attribute), toLiteral(value)))
+ case LessThan(attribute, value) =>
+ Some(Predicates.lt(new NameReference(attribute), toLiteral(value)))
+ case LessThanOrEqual(attribute, value) =>
+ Some(Predicates.lteq(new NameReference(attribute), toLiteral(value)))
+ case GreaterThan(attribute, value) =>
+ Some(Predicates.gt(new NameReference(attribute), toLiteral(value)))
+ case GreaterThanOrEqual(attribute, value) =>
+ Some(Predicates.gteq(new NameReference(attribute), toLiteral(value)))
+ case In(attribute, values) =>
+ Some(Predicates.in(new NameReference(attribute),
values.map(toLiteral(_).asInstanceOf[Expression]).toList.asJava))
+ case And(left, right) =>
+ for {
+ convertedLeft <- convertFilter(left)
+ convertedRight <- convertFilter(right)
+ } yield Predicates.and(convertedLeft, convertedRight)
+ case Or(left, right) =>
+ for {
+ convertedLeft <- convertFilter(left)
+ convertedRight <- convertFilter(right)
+ } yield Predicates.or(convertedLeft, convertedRight)
+ case StringStartsWith(attribute, value) =>
+ Some(Predicates.startsWith(new NameReference(attribute),
toLiteral(value)))
+ case StringContains(attribute, value) =>
+ Some(Predicates.contains(new NameReference(attribute), toLiteral(value)))
+ case Not(child) =>
+ convertFilter(child).map(Predicates.not)
+ case IsNull(attribute) =>
+ Some(Predicates.isNull(new NameReference(attribute)))
+ case IsNotNull(attribute) =>
+ Some(Predicates.isNotNull(new NameReference(attribute)))
+ case _ =>
+ None
+ }
+
+ def toLiteral(value: Any): Literal[_] = {
+ value match {
+ case timestamp : Timestamp =>
+ new Literal(DateTimeUtils.fromJavaTimestamp(timestamp),
Types.TimestampType.get())
+ case date: Date =>
+ new Literal(DateTimeUtils.fromJavaDate(date), Types.DateType.get())
+ case instant: Instant =>
+ new Literal(DateTimeUtils.instantToMicros(instant),
Types.TimestampType.get())
+ case localDate: LocalDate =>
+ new Literal(Math.toIntExact(localDate.toEpochDay),
Types.TimestampType.get())
+ case _ =>
+ Literal.from(value)
+ }
+ }
+
+ def convertDataType(sparkType: DataType): Type = sparkType match {
+ case StructType(fields) =>
+ val convertedFields = fields.zipWithIndex.map {
+ case (field, idx) =>
+ Types.Field.get(idx, field.nullable, field.name,
convertDataType(field.dataType), field.getComment().orNull)
+ }.toList.asJava
+ Types.RecordType.get(convertedFields)
+ case BooleanType =>
+ Types.BooleanType.get()
+ case IntegerType | ShortType | ByteType =>
+ Types.IntType.get()
+ case LongType =>
+ Types.LongType.get()
+ case FloatType =>
+ Types.FloatType.get()
+ case DoubleType =>
+ Types.DoubleType.get()
+ case StringType | CharType(_) | VarcharType(_) =>
+ Types.StringType.get()
+ case DateType =>
+ Types.DateType.get()
+ case TimestampType =>
+ Types.TimestampType.get()
+ case _type: DecimalType =>
+ Types.DecimalType.get(_type.precision, _type.scale)
+ case _ =>
+ throw new UnsupportedOperationException(s"Cannot convert spark type
$sparkType to the relate HUDI type")
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c76af7b39ce..35ef3e9f066 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -31,6 +31,8 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe
+import org.apache.hudi.internal.schema.Types.RecordType
+import org.apache.hudi.internal.schema.utils.Conversions
import org.apache.hudi.keygen.{StringPartitionPathFormatter,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
@@ -44,9 +46,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import java.util.Collections
import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
import scala.language.implicitConversions
+import scala.util.{Failure, Success, Try}
/**
* Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -225,7 +229,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
logInfo("Partition path prefix analysis is disabled; falling back to
fetching all partitions")
getAllQueryPartitionPaths.asScala
} else {
- tryListByPartitionPathPrefix(partitionColumnNames,
partitionPruningPredicates)
+ tryPushDownPartitionPredicates(partitionColumnNames,
partitionPruningPredicates)
}
// NOTE: In some cases, like for ex, when non-encoded slash '/' is used
w/in the partition column's value,
@@ -260,7 +264,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// NOTE: Here we try to to achieve efficiency in avoiding necessity to
recursively list deep folder structures of
// partitioned tables w/ multiple partition columns, by carefully
analyzing provided partition predicates:
//
- // In cases when partition-predicates have
+ // 1. Firstly, when partition-predicates have
// - The form of equality predicates w/ static literals (for ex,
like `date = '2022-01-01'`)
// - Fully specified proper prefix of the partition schema (ie fully
binding first N columns
// of the partition schema adhering to hereby described rules)
@@ -274,11 +278,12 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
//
// country_code: string (for ex, 'us')
// date: string (for ex, '2022-01-01')
+ // hour: string (for ex, '08')
//
// Table's folder structure:
// us/
- // |- 2022-01-01/
- // |- 2022-01-02/
+ // |- 2022-01-01/06
+ // |- 2022-01-02/07
// ...
//
// In case we have incoming query specifies following predicates:
@@ -286,7 +291,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// `... WHERE country_code = 'us' AND date = '2022-01-01'`
//
// We can deduce full partition-path w/o doing a single listing:
`us/2022-01-01`
- private def tryListByPartitionPathPrefix(partitionColumnNames: Seq[String],
partitionColumnPredicates: Seq[Expression]) = {
+ //
+ // 2. Try to push down all partition predicates when listing the sub-folder.
+ // In case we have incoming query specifies following predicates:
+ //
+ // `... WHERE country_code = 'us' AND date = '2022-01-01' and hour = '06'`
+ //
+ // We can deduce full partition-path w/o doing a single listing:
`us/2022-01-01`, and then push down
+ // these filters when listing `us/2022-01-01` to get the directory
'us/2022-01-01/06'
+ private def tryPushDownPartitionPredicates(partitionColumnNames:
Seq[String], partitionColumnPredicates: Seq[Expression]): Seq[PartitionPath] = {
// Static partition-path prefix is defined as a prefix of the full
partition-path where only
// first N partition columns (in-order) have proper (static) values bound
in equality predicates,
// allowing in turn to build such prefix to be used in subsequent filtering
@@ -301,24 +314,59 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
.map(colName => (colName, (staticPartitionColumnValuesMap(colName)._1,
staticPartitionColumnValuesMap(colName)._2.get)))
}
- if (staticPartitionColumnNameValuePairs.isEmpty) {
- logDebug("Unable to compose relative partition path prefix from the
predicates; falling back to fetching all partitions")
- getAllQueryPartitionPaths.asScala
- } else {
- // Based on the static partition-column name-value pairs, we'll try to
compose static partition-path
- // prefix to try to reduce the scope of the required file-listing
- val relativePartitionPathPrefix =
composeRelativePartitionPath(staticPartitionColumnNameValuePairs)
-
- if (!metaClient.getFs.exists(new Path(getBasePath,
relativePartitionPathPrefix))) {
- Seq()
- } else if (staticPartitionColumnNameValuePairs.length ==
partitionColumnNames.length) {
- // In case composed partition path is complete, we can return it
directly avoiding extra listing operation
- Seq(new PartitionPath(relativePartitionPathPrefix,
staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray))
- } else {
- // Otherwise, compile extracted partition values (from query
predicates) into a sub-path which is a prefix
- // of the complete partition path, do listing for this prefix-path only
-
listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava).asScala
+ val hiveStylePartitioning =
metaClient.getTableConfig.getHiveStylePartitioningEnable.toBoolean
+ val urlEncodePartitioning =
metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
+
+ val partitionTypesOption =if (hiveStylePartitioning &&
urlEncodePartitioning) {
+ Try {
+
SparkFilterHelper.convertDataType(partitionSchema).asInstanceOf[RecordType]
+ } match {
+ case Success(partitionRecordType)
+ if partitionRecordType.fields().size() ==
_partitionSchemaFromProperties.size
+ &&
Conversions.isPartitionSchemaSupportedConversion(partitionRecordType) =>
+ Some(partitionRecordType)
+ case _ =>
+ None
}
+ } else {
+ // Avoid convert partition schemas if hivestylePartitioning &
urlEncodePartitioning is not enabled.
+ None
+ }
+
+ (staticPartitionColumnNameValuePairs.isEmpty, partitionTypesOption) match {
+ case (true, Some(partitionTypes)) =>
+ // Push down partition filters without pathPrefix
+ val convertedFilters = SparkFilterHelper.convertFilters(
+ partitionColumnPredicates.flatMap {
+ expr => sparkAdapter.translateFilter(expr)
+ })
+ listPartitionPaths(Collections.singletonList(""), partitionTypes,
convertedFilters).asScala
+ case (true, None) =>
+ logDebug("Unable to compose relative partition path prefix from the
predicates; falling back to fetching all partitions")
+ getAllQueryPartitionPaths.asScala
+ case (false, _) =>
+ // Based on the static partition-column name-value pairs, we'll try to
compose static partition-path
+ // prefix to try to reduce the scope of the required file-listing
+ val relativePartitionPathPrefix =
composeRelativePartitionPath(staticPartitionColumnNameValuePairs)
+
+ if (!metaClient.getFs.exists(new Path(getBasePath,
relativePartitionPathPrefix))) {
+ Seq()
+ } else if (staticPartitionColumnNameValuePairs.length ==
partitionColumnNames.length) {
+ // In case composed partition path is complete, we can return it
directly avoiding extra listing operation
+ Seq(new PartitionPath(relativePartitionPathPrefix,
staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray))
+ } else {
+ partitionTypesOption.map { partitionTypes =>
+ // Try to composite path prefix and filters to gain better
performance
+ val convertedFilters = SparkFilterHelper.convertFilters(
+ partitionColumnPredicates.flatMap {
+ expr => sparkAdapter.translateFilter(expr)
+ })
+ listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava,
partitionTypes, convertedFilters).asScala
+ }.getOrElse {
+ log.warn("Met incompatible issue when converting to hudi data
type, rollback to list by prefix directly")
+
listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava).asScala
+ }
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index f8f082489ce..ba5c2edb2d1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -383,6 +383,66 @@ class TestHoodieFileIndex extends
HoodieSparkClientTestBase with ScalaAssertionS
}
}
+ /**
+ * This test mainly ensures all non-partition-prefix filter can be pushed
successfully
+ */
+ @ParameterizedTest
+ @CsvSource(value = Array("true, false", "false, false", "true, true",
"false, true"))
+ def
testPartitionPruneWithMultiplePartitionColumnsWithComplexExpression(useMetadataTable:
Boolean,
+
complexExpressionPushDown: Boolean): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ val partitionNames = Seq("prefix", "dt", "hh", "country")
+ val writerOpts: Map[String, String] = commonOpts ++ Map(
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ RECORDKEY_FIELD.key -> "id",
+ PRECOMBINE_FIELD.key -> "version",
+ PARTITIONPATH_FIELD.key -> partitionNames.mkString(","),
+ HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString
+ )
+
+ val readerOpts: Map[String, String] = queryOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString,
+ DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> "lazy",
+
DataSourceReadOptions.FILE_INDEX_LISTING_PARTITION_PATH_PREFIX_ANALYSIS_ENABLED.key
-> "true"
+ )
+
+ // Add a prefix "default" to ensure `PushDownByPartitionPrefix` not work
+ val inputDF1 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 10000,
+ "default", s"2021-03-0${i % 2 + 1}", i % 6 + 1, if (i % 2 == 0) "CN"
else "SG"))
+ .toDF("id", "name", "price", "version", "prefix", "dt", "hh", "country")
+
+ inputDF1.write.format("hudi")
+ .options(writerOpts)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key,
complexExpressionPushDown.toString)
+ .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key,
complexExpressionPushDown.toString)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ // NOTE: We're init-ing file-index in advance to additionally test
refreshing capability
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, readerOpts)
+
+ val partitionFilters = EqualTo(attribute("hh"), Literal.create(5))
+
+ val partitionAndFilesAfterPrune =
fileIndex.listFiles(Seq(partitionFilters), Seq.empty)
+ assertEquals(1, partitionAndFilesAfterPrune.size)
+
+ assertEquals(fileIndex.areAllPartitionPathsCached(),
!complexExpressionPushDown)
+
+ val PartitionDirectory(partitionActualValues, filesAfterPrune) =
partitionAndFilesAfterPrune.head
+ val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN")
+ assertEquals(partitionExpectValues.mkString(","),
partitionActualValues.toSeq(Seq(StringType)).mkString(","))
+ assertEquals(getFileCountInPartitionPath(makePartitionPath(partitionNames,
partitionExpectValues, complexExpressionPushDown)),
+ filesAfterPrune.size)
+
+ val readDF = spark.read.format("hudi").options(readerOpts).load()
+
+ assertEquals(10, readDF.count())
+ assertEquals(1, readDF.filter("hh = 5").count())
+ }
+
@ParameterizedTest
@CsvSource(value = Array("true", "false"))
def
testFileListingPartitionPrefixAnalysis(enablePartitionPathPrefixAnalysis:
Boolean): Unit = {
@@ -686,6 +746,18 @@ class TestHoodieFileIndex extends
HoodieSparkClientTestBase with ScalaAssertionS
private def getFileCountInPartitionPaths(partitionPaths: String*): Int = {
partitionPaths.map(getFileCountInPartitionPath).sum
}
+
+ private def makePartitionPath(partitionNames: Seq[String],
+ partitionValues: Seq[String],
+ hiveStylePartitioning: Boolean): String = {
+ if (hiveStylePartitioning) {
+ partitionNames.zip(partitionValues).map {
+ case (name, value) => s"$name=$value"
+ }.mkString(Path.SEPARATOR)
+ } else {
+ partitionValues.mkString(Path.SEPARATOR)
+ }
+ }
}
object TestHoodieFileIndex {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala
new file mode 100644
index 00000000000..7d53596ac10
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.SparkFilterHelper.convertFilter
+import org.apache.hudi.expression.{Expression, NameReference, Predicates,
Literal => HLiteral}
+import org.apache.hudi.testutils.HoodieClientTestHarness
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.{Assertions, Test}
+
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
+
+class TestSparkFilterHelper extends HoodieClientTestHarness with
SparkAdapterSupport {
+
+ @Test
+ def testConvertInExpression(): Unit = {
+ val filterExpr = sparkAdapter.translateFilter(
+ expr("col1 IN (1, 2, 3)").expr.transformUp {
+ case UnresolvedAttribute(nameParts) =>
AttributeReference(nameParts.mkString("."), IntegerType)()
+ })
+
+ val result = SparkFilterHelper.convertFilter(filterExpr.get).get
+
+ val expected = Predicates.in(
+ new NameReference("col1"),
+ Seq(1, 2, 3).map(v => HLiteral.from(v).asInstanceOf[Expression]).asJava)
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertInSetExpression(): Unit = {
+ val filterExpr = sparkAdapter.translateFilter(
+ InSet(AttributeReference("col1", StringType)(), Set("value1", "value2",
"value3").map(UTF8String.fromString)))
+
+ val result = SparkFilterHelper.convertFilter(filterExpr.get).get
+
+ val expected = Predicates.in(
+ new NameReference("col1"),
+ Seq("value1", "value2", "value3").map(v =>
HLiteral.from(v).asInstanceOf[Expression]).asJava)
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertEqualToExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(EqualTo(AttributeReference("col1", LongType)(),
Literal(1L)))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.eq(
+ new NameReference("col1"),
+ HLiteral.from(1L).asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertGreaterThanExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(GreaterThan(AttributeReference("col3",
DoubleType)(), Literal(3.0D)))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.gt(
+ new NameReference("col3"),
+ HLiteral.from(3.0D).asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertGreaterThanOrEqualExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(GreaterThanOrEqual(AttributeReference("col4",
FloatType)(), Literal(4.0f)))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.gteq(
+ new NameReference("col4"),
+ HLiteral.from(4.0f).asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertLessThanExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(LessThan(AttributeReference("col5", StringType)(),
Literal("abc")))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.lt(
+ new NameReference("col5"),
+ HLiteral.from("abc").asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertLessThanOrEqualExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(LessThanOrEqual(AttributeReference("col6",
BooleanType)(), Literal(true)))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.lteq(
+ new NameReference("col6"),
+ HLiteral.from(true).asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertStartsWithExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(StartsWith(AttributeReference("col2",
StringType)(), Literal("prefix")))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.startsWith(
+ new NameReference("col2"),
+ HLiteral.from("prefix").asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertContainsExpression(): Unit = {
+ val filter =
sparkAdapter.translateFilter(Contains(AttributeReference("col2", StringType)(),
Literal("prefix")))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.contains(
+ new NameReference("col2"),
+ HLiteral.from("prefix").asInstanceOf[Expression])
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertAndExpression(): Unit = {
+ val filter = sparkAdapter.translateFilter(And(
+ EqualTo(AttributeReference("col1", IntegerType)(), Literal(1)),
+ GreaterThan(AttributeReference("col2", FloatType)(), Literal(2.0F))))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.and(
+ Predicates.eq(
+ new NameReference("col1"),
+ HLiteral.from(1).asInstanceOf[Expression]),
+ Predicates.gt(
+ new NameReference("col2"),
+ HLiteral.from(2.0F).asInstanceOf[Expression]))
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+
+ @Test
+ def testConvertOrEqualExpression(): Unit = {
+ val filter = sparkAdapter.translateFilter(Or(
+ LessThan(AttributeReference("col1", ShortType)(), Literal(10.toShort)),
+ GreaterThanOrEqual(AttributeReference("col2", DoubleType)(),
Literal(100.0D))))
+ val result = convertFilter(filter.get).get
+
+ val expected = Predicates.or(
+ Predicates.lt(
+ new NameReference("col1"),
+ HLiteral.from(10.toShort).asInstanceOf[Expression]),
+ Predicates.gteq(
+ new NameReference("col2"),
+ HLiteral.from(100.0D).asInstanceOf[Expression]))
+
+ Assertions.assertEquals(result.toString, expected.toString)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
index be1dae8cc27..35afff918b9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
@@ -220,7 +220,7 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
avroRecord.put("preferences", preferencesRecord)
// fill mapType
val locations = new HashMap[String, GenericData.Record]
- val mapSchema =
AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType,
"test1_locations")
+ val mapSchema =
AvroInternalSchemaConverter.convert(record.fieldByNameCaseInsensitive("locations").`type`.asInstanceOf[Types.MapType].valueType,
"test1_locations")
val locationsValue: GenericData.Record = new GenericData.Record(mapSchema)
locationsValue.put("lat", 1.2f)
locationsValue.put("long", 1.4f)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
index 0467b6664c6..e2635c0cba8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
@@ -55,6 +55,36 @@ class TestLazyPartitionPathFetching extends
HoodieSparkSqlTestBase {
}
}
+ test("Test querying with date column + partition pruning") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | grass_date date
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | PARTITIONED BY (grass_date)
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000,
date('2023-02-27'))")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000,
date('2023-02-28'))")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000,
date('2023-03-01'))")
+
+ checkAnswer(s"select id, name, price, ts from $tableName where
grass_date = date'2023-03-01' order by id")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+ }
+ }
+
test("Test querying with date column + partition pruning (multi-level
partitioning)") {
withTempDir { tmp =>
val tableName = generateTableName
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala
new file mode 100644
index 00000000000..1b5e590913f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+
+class TestPartitionPushDownWhenListingPaths extends HoodieSparkSqlTestBase {
+
+ test("Test push down different partitions") {
+ Seq("true", "false").foreach { enableMetadata =>
+ withSQLConf(HoodieMetadataConfig.ENABLE.key -> enableMetadata) {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | date_par date,
+ | country string,
+ | hour int,
+ | longValue long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = 'true',
+ | hoodie.datasource.write.partitionpath.urlencode = 'true'
+ | )
+ | PARTITIONED BY (date_par, country, hour,
longValue)""".stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, date
'2023-02-27', 'ID', 1, 102345L)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, date
'2023-02-28', 'US', 4, 102346L)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, date
'2023-03-01', 'CN', 10, 102347L)")
+
+ // Only filter one partition column
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par = date'2023-03-01' order by id")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ // Filter with And operation
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par = date'2023-02-28' and hour = 4 order by id")(
+ Seq(2, "a2", 10.0, 1000)
+ )
+
+ // Filter with Or operation
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par = date'2023-02-28' or country = 'CN' order by id")(
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ // Filter with GT
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par > date'2023-02-27' order by id")(
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ // Filter with LT
+ checkAnswer(s"select id, name, price, ts from $tableName where
longValue < 102346L order by id")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+
+ // Filter with EQ
+ checkAnswer(s"select id, name, price, ts from $tableName where
longValue = 102346L order by id")(
+ Seq(2, "a2", 10.0, 1000)
+ )
+
+ // Filter with GT_EQ
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par >= date'2023-02-27' order by id")(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ // Filter with LT_EQ
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par <= date'2023-02-27' order by id")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+
+ // Filter with In operation
+ checkAnswer(s"select id, name, price, ts from $tableName where
country in ('CN', 'US') order by id")(
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 9eddf81d121..cc2e25f9891 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -38,7 +38,7 @@ import
org.apache.spark.sql.execution.vectorized.MutableColumnarRow
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -187,4 +187,17 @@ class Spark2Adapter extends SparkAdapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ /**
+ * Spark2 doesn't support nestedPredicatePushdown,
+ * so fail it if [[supportNestedPredicatePushdown]] is true here.
+ */
+ override def translateFilter(predicate: Expression,
+ supportNestedPredicatePushdown: Boolean =
false): Option[Filter] = {
+ if (supportNestedPredicatePushdown) {
+ throw new UnsupportedOperationException("Nested predicate push down is
not supported")
+ }
+
+ DataSourceStrategy.translateFilter(predicate)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index d5a35980526..ce9499ae7d2 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
InterpretedPredica
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext,
SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
@@ -92,4 +92,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
}
override def convertStorageLevelToString(level: StorageLevel): String
+
+ override def translateFilter(predicate: Expression,
+ supportNestedPredicatePushdown: Boolean =
false): Option[Filter] = {
+ DataSourceStrategy.translateFilter(predicate,
supportNestedPredicatePushdown)
+ }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
index f1d4724cc1b..f42b157727c 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
@@ -18,13 +18,14 @@
package org.apache.hudi.hive.util;
-import org.apache.hudi.hive.expression.AttributeReferenceExpression;
-import org.apache.hudi.hive.expression.BinaryOperator;
-import org.apache.hudi.hive.expression.Expression;
-import org.apache.hudi.hive.expression.ExpressionVisitor;
-import org.apache.hudi.hive.expression.Literal;
-
-import java.util.Locale;
+import org.apache.hudi.expression.NameReference;
+import org.apache.hudi.expression.BoundReference;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.expression.ExpressionVisitor;
+import org.apache.hudi.expression.Literal;
+import org.apache.hudi.expression.Predicate;
+import org.apache.hudi.expression.Predicates;
+import org.apache.hudi.internal.schema.Types;
public class FilterGenVisitor implements ExpressionVisitor<String> {
@@ -42,9 +43,10 @@ public class FilterGenVisitor implements
ExpressionVisitor<String> {
}
}
- private String visitAnd(Expression left, Expression right) {
- String leftResult = left.accept(this);
- String rightResult = right.accept(this);
+ @Override
+ public String visitAnd(Predicates.And and) {
+ String leftResult = and.getLeft().accept(this);
+ String rightResult = and.getRight().accept(this);
if (leftResult.isEmpty()) {
if (rightResult.isEmpty()) {
@@ -59,9 +61,10 @@ public class FilterGenVisitor implements
ExpressionVisitor<String> {
return "(" + makeBinaryOperatorString(leftResult, Expression.Operator.AND,
rightResult) + ")";
}
- private String visitOr(Expression left, Expression right) {
- String leftResult = left.accept(this);
- String rightResult = right.accept(this);
+ @Override
+ public String visitOr(Predicates.Or or) {
+ String leftResult = or.getLeft().accept(this);
+ String rightResult = or.getRight().accept(this);
if (!leftResult.isEmpty() && !rightResult.isEmpty()) {
return "(" + makeBinaryOperatorString(leftResult,
Expression.Operator.OR, rightResult) + ")";
@@ -81,39 +84,46 @@ public class FilterGenVisitor implements
ExpressionVisitor<String> {
}
@Override
- public String visitBinaryOperator(BinaryOperator expr) {
- switch (expr.getOperator()) {
- case AND:
- return visitAnd(expr.getLeft(), expr.getRight());
- case OR:
- return visitOr(expr.getLeft(), expr.getRight());
- case EQ:
- case GT:
- case LT:
- case GT_EQ:
- case LT_EQ:
- return visitBinaryComparator(expr.getLeft(), expr.getOperator(),
expr.getRight());
- default:
- return "";
+ public String visitPredicate(Predicate predicate) {
+ if (predicate instanceof Predicates.BinaryComparison) {
+ Predicates.BinaryComparison expr = (Predicates.BinaryComparison)
predicate;
+ return visitBinaryComparator(expr.getLeft(), expr.getOperator(),
expr.getRight());
}
+
+ return "";
+ }
+
+ @Override
+ public String alwaysTrue() {
+ return "";
+ }
+
+ @Override
+ public String alwaysFalse() {
+ return "";
}
@Override
public String visitLiteral(Literal literalExpr) {
- switch (literalExpr.getType().toLowerCase(Locale.ROOT)) {
- case HiveSchemaUtil.STRING_TYPE_NAME:
- return quoteStringLiteral(literalExpr.getValue());
- case HiveSchemaUtil.INT_TYPE_NAME:
- case HiveSchemaUtil.BIGINT_TYPE_NAME:
- case HiveSchemaUtil.DATE_TYPE_NAME:
- return literalExpr.getValue();
- default:
- return "";
+ if (literalExpr.getDataType() instanceof Types.StringType) {
+ return quoteStringLiteral(((Literal<String>)literalExpr).getValue());
}
+
+ if (literalExpr.getDataType() instanceof Types.IntType ||
literalExpr.getDataType() instanceof Types.LongType
+ || literalExpr.getDataType() instanceof Types.DateType) {
+ return literalExpr.getValue().toString();
+ }
+
+ return "";
}
@Override
- public String visitAttribute(AttributeReferenceExpression attribute) {
+ public String visitNameReference(NameReference attribute) {
return attribute.getName();
}
+
+ @Override
+ public String visitBoundReference(BoundReference boundReference) {
+ throw new UnsupportedOperationException("BoundReference cannot be used to
build filter string");
+ }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
index 8202408d8bd..9ff22d2d5dc 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
@@ -20,12 +20,14 @@ package org.apache.hudi.hive.util;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.expression.Predicates;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
-import org.apache.hudi.hive.expression.AttributeReferenceExpression;
-import org.apache.hudi.hive.expression.BinaryOperator;
-import org.apache.hudi.hive.expression.Expression;
-import org.apache.hudi.hive.expression.Literal;
+import org.apache.hudi.expression.NameReference;
+import org.apache.hudi.expression.BinaryExpression;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.expression.Literal;
+import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
@@ -54,6 +56,28 @@ public class PartitionFilterGenerator {
}
};
+ private static final String UNSUPPORTED_TYPE_ERROR = "The value type: %s
doesn't support to "
+ + "be pushed down to HMS, acceptable types: " + String.join(",",
SUPPORT_TYPES);
+
+ private static Literal buildLiteralExpression(String fieldValue, String
fieldType) {
+ switch (fieldType.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ return new Literal<>(Integer.parseInt(fieldValue),
Types.IntType.get());
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ return new Literal<>(Long.parseLong(fieldValue), Types.LongType.get());
+ // TODO Handle Date value
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return new Literal<>(fieldValue, Types.DateType.get());
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return new Literal<>(fieldValue, Types.StringType.get());
+ case HiveSchemaUtil.BOOLEAN_TYPE_NAME:
+ return new Literal<>(Boolean.parseBoolean(fieldValue),
Types.BooleanType.get());
+ default:
+ throw new
IllegalArgumentException(String.format(UNSUPPORTED_TYPE_ERROR, fieldType));
+ }
+ }
+
+
/**
* Build expression from the Partition list. Here we're trying to match all
partitions.
*
@@ -68,11 +92,10 @@ public class PartitionFilterGenerator {
for (int i = 0; i < partitionFields.size(); i++) {
FieldSchema field = partitionFields.get(i);
- String value = partitionValues.get(i);
- BinaryOperator exp = BinaryOperator.eq(new
AttributeReferenceExpression(field.getName()),
- new Literal(value, field.getType()));
+ BinaryExpression exp = Predicates.eq(new
NameReference(field.getName()),
+ buildLiteralExpression(partitionValues.get(i), field.getType()));
if (root != null) {
- root = BinaryOperator.and(root, exp);
+ root = Predicates.and(root, exp);
} else {
root = exp;
}
@@ -82,7 +105,7 @@ public class PartitionFilterGenerator {
if (result == null) {
return expr;
} else {
- return BinaryOperator.or(result, expr);
+ return Predicates.or(result, expr);
}
});
}
@@ -129,8 +152,7 @@ public class PartitionFilterGenerator {
case HiveSchemaUtil.STRING_TYPE_NAME:
return s1.compareTo(s2);
default:
- throw new IllegalArgumentException("The value type: " + valueType +
" doesn't support to "
- + "be pushed down to HMS, acceptable types: " + String.join(",",
SUPPORT_TYPES));
+ throw new
IllegalArgumentException(String.format(UNSUPPORTED_TYPE_ERROR, valueType));
}
}
}
@@ -152,26 +174,26 @@ public class PartitionFilterGenerator {
String[] values = fieldWithValues.getValue();
if (values.length == 1) {
- return BinaryOperator.eq(new
AttributeReferenceExpression(fieldSchema.getName()),
- new Literal(values[0], fieldSchema.getType()));
+ return Predicates.eq(new NameReference(fieldSchema.getName()),
+ buildLiteralExpression(values[0], fieldSchema.getType()));
}
Arrays.sort(values, new ValueComparator(fieldSchema.getType()));
- return BinaryOperator.and(
- BinaryOperator.gteq(
- new AttributeReferenceExpression(fieldSchema.getName()),
- new Literal(values[0], fieldSchema.getType())),
- BinaryOperator.lteq(
- new AttributeReferenceExpression(fieldSchema.getName()),
- new Literal(values[values.length - 1], fieldSchema.getType())));
+ return Predicates.and(
+ Predicates.gteq(
+ new NameReference(fieldSchema.getName()),
+ buildLiteralExpression(values[0], fieldSchema.getType())),
+ Predicates.lteq(
+ new NameReference(fieldSchema.getName()),
+ buildLiteralExpression(values[values.length - 1],
fieldSchema.getType())));
})
.filter(Objects::nonNull)
.reduce(null, (result, expr) -> {
if (result == null) {
return expr;
} else {
- return BinaryOperator.and(result, expr);
+ return Predicates.and(result, expr);
}
});
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
index 0c85d44d068..3f1a19421ac 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
@@ -297,7 +297,7 @@ public class HoodieDataTableValidator implements
Serializable {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try {
HoodieTableMetadata tableMetadata = new FileSystemBackedTableMetadata(
- engineContext, engineContext.getHadoopConf(), cfg.basePath,
cfg.assumeDatePartitioning);
+ engineContext, metaClient.getTableConfig(),
engineContext.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning);
List<Path> allDataFilePaths =
HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(tableMetadata,
cfg.basePath);
// verify that no data files present with commit time < earliest commit
in active timeline.
if (metaClient.getActiveTimeline().firstInstant().isPresent()) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index 84fa604c76d..70146ef55c8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -164,7 +164,7 @@ public class HoodieRepairTool {
.build();
this.tableMetadata = new FileSystemBackedTableMetadata(
- context, context.getHadoopConf(), cfg.basePath,
cfg.assumeDatePartitioning);
+ context, metaClient.getTableConfig(), context.getHadoopConf(),
cfg.basePath, cfg.assumeDatePartitioning);
}
public boolean run() {