codope commented on code in PR #11710:
URL: https://github.com/apache/hudi/pull/11710#discussion_r1728754590
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -181,6 +183,67 @@ object HoodieAnalysis extends SparkAdapterSupport {
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends
Rule[LogicalPlan] {
+ /**
+ * The function updates the HadoopFSRelation so that it uses
HoodieFileIndex instead of
+ * HoodieFileIndexTimestampKeyGen. Also the data type for output
attributes of the plan are changed
+ * accordingly. HoodieFileIndexTimestampKeyGen is used by
HoodieBaseRelation for reading tables with
+ * Timestamp or custom key generator.
+ */
+ private def transformReaderFSRelation(logicalPlan: Option[LogicalPlan]):
Option[LogicalPlan] = {
+ def getAttributesFromTableSchema(catalogTableOpt: Option[CatalogTable],
lr: LogicalRelation, attributesSet: Set[AttributeReference]) = {
+ var finalAttrs: List[AttributeReference] = List.empty
+ if (catalogTableOpt.isDefined) {
+ for (attr <- lr.output) {
+ val origAttr: AttributeReference = attributesSet.collectFirst({
case a if a.name.equals(attr.name) => a }).get
+ val catalogAttr =
catalogTableOpt.get.partitionSchema.fields.collectFirst({ case a if
a.name.equals(attr.name) => a })
+ val newAttr: AttributeReference = if (catalogAttr.isDefined) {
+ origAttr.copy(dataType =
catalogAttr.get.dataType)(origAttr.exprId, origAttr.qualifier)
+ } else {
+ origAttr
+ }
+ finalAttrs = finalAttrs :+ newAttr
+ }
+ }
+ finalAttrs
+ }
+
+ def getHadoopFsRelation(plan: LogicalPlan): Option[HadoopFsRelation] = {
+ EliminateSubqueryAliases(plan) match {
+ // First, we need to weed out unresolved plans
+ case plan if !plan.resolved => None
+ // NOTE: When resolving Hudi table we allow [[Filter]]s and
[[Project]]s be applied
+ // on top of it
+ case PhysicalOperation(_, _, LogicalRelation(relation, _, _, _)) if
relation.isInstanceOf[HadoopFsRelation] =>
Some(relation.asInstanceOf[HadoopFsRelation])
+ case _ => None
+ }
+ }
+
+ logicalPlan.map(relation => {
+ val catalogTableOpt = sparkAdapter.resolveHoodieTable(relation)
+ val fsRelation = getHadoopFsRelation(relation)
+ if (fsRelation.isDefined &&
fsRelation.get.location.isInstanceOf[HoodieReaderFileIndex]) {
Review Comment:
minor: The check `if (fsRelation.isDefined &&
fsRelation.get.location.isInstanceOf[HoodieReaderFileIndex])` could be
refactored to:
```
fsRelation.collect {
case relation if relation.location.isInstanceOf[HoodieReaderFileIndex] =>
// Continue with the transformation
}.getOrElse(relation)
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -365,10 +373,63 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
// INSERT INTO should succeed now
testFirstRoundInserts(tableName, TS_FORMATTER_FUNC,
customPartitionFunc)
}
+
+ // Validate ts field is still of type int in the table
+ validateTsFieldSchema(tablePath)
}
}
}
+ test("Test query with custom key generator") {
+ withTempDir { tmp => {
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val writePartitionFields = "ts:timestamp"
+ val dateFormat = "yyyy/MM/dd"
+ val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts,
dateFormat)
+ val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts)
+ val keyGenConfigs = TS_KEY_GEN_CONFIGS +
("hoodie.keygen.timebased.output.dateformat" -> dateFormat)
+
+ prepareTableWithKeyGenerator(
+ tableName, tablePath, "MERGE_ON_READ",
+ CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs)
+
+ createTableWithSql(tableName, tablePath,
+ s"hoodie.datasource.write.partitionpath.field =
'$writePartitionFields', "
+ + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", "))
+
+ // INSERT INTO should fail due to conflict between write and table
config of partition path fields
+ val sourceTableName = tableName + "_source"
+ prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227,
'cat1')"))
Review Comment:
please assert the exception if it is expected to fail.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java:
##########
@@ -317,7 +333,7 @@ public void
testPartitionFieldsInImproperFormat(TypedProperties props) {
keyGenerator.getPartitionPath(row);
Assertions.fail("should fail when partition key field is provided in
improper format!");
} catch (Exception e) {
-
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("Unable
to find field names for partition path in proper format"));
+
assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("Unable to
find field names for partition path in proper format"));
Review Comment:
Instead of asserting exception message, can we reuse existing Hoodie
exception or create a new one and assert that exception is thrown? I know this
is not related to your change but would be better to do that in a separate PR.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieReaderFileIndex.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache,
PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+class HoodieReaderFileIndex(override val spark: SparkSession,
+ override val metaClient: HoodieTableMetaClient,
+ override val schemaSpec: Option[StructType],
+ override val options: Map[String, String],
+ @transient override val fileStatusCache:
FileStatusCache = NoopCache,
+ override val includeLogFiles: Boolean = false,
+ override val shouldEmbedFileSlices: Boolean =
false)
+ extends HoodieFileIndex(
+ spark = spark,
+ metaClient = metaClient,
+ schemaSpec = schemaSpec,
+ options = options,
+ fileStatusCache = fileStatusCache,
+ includeLogFiles = includeLogFiles,
+ shouldEmbedFileSlices = shouldEmbedFileSlices) {
+
+ /**
+ *
+ * Returns set of indices with timestamp partition type. For Timestamp based
keygen, there is only one
+ * partition so index is 0. For custom keygen, it is the partition indices
for which partition type is
+ * timestamp.
+ */
+ private def getTimestampPartitionIndex(): Set[Int] = {
Review Comment:
we need test for this utility method.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -181,6 +183,67 @@ object HoodieAnalysis extends SparkAdapterSupport {
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends
Rule[LogicalPlan] {
+ /**
+ * The function updates the HadoopFSRelation so that it uses
HoodieFileIndex instead of
+ * HoodieFileIndexTimestampKeyGen. Also the data type for output
attributes of the plan are changed
+ * accordingly. HoodieFileIndexTimestampKeyGen is used by
HoodieBaseRelation for reading tables with
+ * Timestamp or custom key generator.
+ */
+ private def transformReaderFSRelation(logicalPlan: Option[LogicalPlan]):
Option[LogicalPlan] = {
+ def getAttributesFromTableSchema(catalogTableOpt: Option[CatalogTable],
lr: LogicalRelation, attributesSet: Set[AttributeReference]) = {
+ var finalAttrs: List[AttributeReference] = List.empty
+ if (catalogTableOpt.isDefined) {
+ for (attr <- lr.output) {
+ val origAttr: AttributeReference = attributesSet.collectFirst({
case a if a.name.equals(attr.name) => a }).get
+ val catalogAttr =
catalogTableOpt.get.partitionSchema.fields.collectFirst({ case a if
a.name.equals(attr.name) => a })
+ val newAttr: AttributeReference = if (catalogAttr.isDefined) {
+ origAttr.copy(dataType =
catalogAttr.get.dataType)(origAttr.exprId, origAttr.qualifier)
+ } else {
+ origAttr
+ }
+ finalAttrs = finalAttrs :+ newAttr
+ }
+ }
+ finalAttrs
+ }
+
+ def getHadoopFsRelation(plan: LogicalPlan): Option[HadoopFsRelation] = {
+ EliminateSubqueryAliases(plan) match {
+ // First, we need to weed out unresolved plans
+ case plan if !plan.resolved => None
+ // NOTE: When resolving Hudi table we allow [[Filter]]s and
[[Project]]s be applied
+ // on top of it
+ case PhysicalOperation(_, _, LogicalRelation(relation, _, _, _)) if
relation.isInstanceOf[HadoopFsRelation] =>
Some(relation.asInstanceOf[HadoopFsRelation])
+ case _ => None
+ }
+ }
+
+ logicalPlan.map(relation => {
+ val catalogTableOpt = sparkAdapter.resolveHoodieTable(relation)
+ val fsRelation = getHadoopFsRelation(relation)
+ if (fsRelation.isDefined &&
fsRelation.get.location.isInstanceOf[HoodieReaderFileIndex]) {
Review Comment:
under what scenarios, index is an instance of `HoodieReaderFileIndex`? Is it
only for insert into, update and merge into commands? Let's add in a comment.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -365,10 +373,63 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
// INSERT INTO should succeed now
testFirstRoundInserts(tableName, TS_FORMATTER_FUNC,
customPartitionFunc)
}
+
+ // Validate ts field is still of type int in the table
+ validateTsFieldSchema(tablePath)
}
}
}
+ test("Test query with custom key generator") {
+ withTempDir { tmp => {
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val writePartitionFields = "ts:timestamp"
+ val dateFormat = "yyyy/MM/dd"
+ val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts,
dateFormat)
+ val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts)
+ val keyGenConfigs = TS_KEY_GEN_CONFIGS +
("hoodie.keygen.timebased.output.dateformat" -> dateFormat)
+
+ prepareTableWithKeyGenerator(
+ tableName, tablePath, "MERGE_ON_READ",
+ CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs)
+
+ createTableWithSql(tableName, tablePath,
+ s"hoodie.datasource.write.partitionpath.field =
'$writePartitionFields', "
+ + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", "))
+
+ // INSERT INTO should fail due to conflict between write and table
config of partition path fields
+ val sourceTableName = tableName + "_source"
+ prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227,
'cat1')"))
+ // INSERT INTO should succeed now
+ testFirstRoundInserts(tableName, tsGenFunc, customPartitionFunc)
+ assertEquals(7, spark.sql(
+ s"""
+ | SELECT * from $tableName
+ | """.stripMargin).count())
+ val incrementalDF = spark.read.format("hudi").
+ option("hoodie.datasource.query.type", "incremental").
+ option("hoodie.datasource.read.begin.instanttime", 0).
+ load(tablePath)
+ incrementalDF.createOrReplaceTempView("tbl_incremental")
+ assertEquals(7, spark.sql(
+ s"""
+ | SELECT * from tbl_incremental
+ | """.stripMargin).count())
+
+ // Validate ts field is still of type int in the table
+ validateTsFieldSchema(tablePath)
+ }
+ }
+ }
+
+ private def validateTsFieldSchema(tablePath: String): Unit = {
Review Comment:
we can make it more reusable, by taking field name and expected type in the
argument.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieReaderFileIndex.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache,
PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+class HoodieReaderFileIndex(override val spark: SparkSession,
Review Comment:
please add a detailed scaladoc for this class. Why it is needed and how it
is different from `SparkHoodieTableFileIndex`?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -181,6 +183,67 @@ object HoodieAnalysis extends SparkAdapterSupport {
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends
Rule[LogicalPlan] {
+ /**
+ * The function updates the HadoopFSRelation so that it uses
HoodieFileIndex instead of
+ * HoodieFileIndexTimestampKeyGen. Also the data type for output
attributes of the plan are changed
+ * accordingly. HoodieFileIndexTimestampKeyGen is used by
HoodieBaseRelation for reading tables with
+ * Timestamp or custom key generator.
+ */
+ private def transformReaderFSRelation(logicalPlan: Option[LogicalPlan]):
Option[LogicalPlan] = {
+ def getAttributesFromTableSchema(catalogTableOpt: Option[CatalogTable],
lr: LogicalRelation, attributesSet: Set[AttributeReference]) = {
+ var finalAttrs: List[AttributeReference] = List.empty
+ if (catalogTableOpt.isDefined) {
+ for (attr <- lr.output) {
+ val origAttr: AttributeReference = attributesSet.collectFirst({
case a if a.name.equals(attr.name) => a }).get
+ val catalogAttr =
catalogTableOpt.get.partitionSchema.fields.collectFirst({ case a if
a.name.equals(attr.name) => a })
+ val newAttr: AttributeReference = if (catalogAttr.isDefined) {
+ origAttr.copy(dataType =
catalogAttr.get.dataType)(origAttr.exprId, origAttr.qualifier)
+ } else {
+ origAttr
+ }
+ finalAttrs = finalAttrs :+ newAttr
+ }
+ }
+ finalAttrs
+ }
+
+ def getHadoopFsRelation(plan: LogicalPlan): Option[HadoopFsRelation] = {
+ EliminateSubqueryAliases(plan) match {
+ // First, we need to weed out unresolved plans
+ case plan if !plan.resolved => None
+ // NOTE: When resolving Hudi table we allow [[Filter]]s and
[[Project]]s be applied
+ // on top of it
+ case PhysicalOperation(_, _, LogicalRelation(relation, _, _, _)) if
relation.isInstanceOf[HadoopFsRelation] =>
Some(relation.asInstanceOf[HadoopFsRelation])
+ case _ => None
+ }
+ }
+
+ logicalPlan.map(relation => {
+ val catalogTableOpt = sparkAdapter.resolveHoodieTable(relation)
+ val fsRelation = getHadoopFsRelation(relation)
+ if (fsRelation.isDefined &&
fsRelation.get.location.isInstanceOf[HoodieReaderFileIndex]) {
+ relation transformUp {
+ case lr: LogicalRelation =>
+ val finalAttrs: List[AttributeReference] =
getAttributesFromTableSchema(catalogTableOpt, lr, lr.output.toSet)
+ val newFsRelation: BaseRelation = lr.relation match {
+ case fsRelation: HadoopFsRelation =>
+ if (catalogTableOpt.isDefined) {
+ // replace HoodieFileIndexTimestampKeyGen with
HoodieFileIndex instance
Review Comment:
here again, i think you meant replace `HoodieReaderFileIndex`
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -181,6 +183,67 @@ object HoodieAnalysis extends SparkAdapterSupport {
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends
Rule[LogicalPlan] {
+ /**
+ * The function updates the HadoopFSRelation so that it uses
HoodieFileIndex instead of
+ * HoodieFileIndexTimestampKeyGen. Also the data type for output
attributes of the plan are changed
+ * accordingly. HoodieFileIndexTimestampKeyGen is used by
HoodieBaseRelation for reading tables with
+ * Timestamp or custom key generator.
+ */
+ private def transformReaderFSRelation(logicalPlan: Option[LogicalPlan]):
Option[LogicalPlan] = {
Review Comment:
How often is this function called during the lifecycle of a query?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -181,6 +183,67 @@ object HoodieAnalysis extends SparkAdapterSupport {
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends
Rule[LogicalPlan] {
+ /**
+ * The function updates the HadoopFSRelation so that it uses
HoodieFileIndex instead of
+ * HoodieFileIndexTimestampKeyGen. Also the data type for output
attributes of the plan are changed
Review Comment:
`HoodieFileIndexTimestampKeyGen` -> i don't see this class anywhere used.
Did you mean to use `HoodieReaderFileIndex`?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieReaderFileIndex.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache,
PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+class HoodieReaderFileIndex(override val spark: SparkSession,
Review Comment:
Where are we ensuring that `HoodieReaderFileIndex` is only used when table
uses custom keygen? In `HoodieHadoopFsRelationFactory`, I see that it is being
used as the default file index. Is that expected?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -111,45 +111,12 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the partition schema from the hoodie.properties.
*/
- private lazy val _partitionSchemaFromProperties: StructType = {
- val tableConfig = metaClient.getTableConfig
- val partitionColumns = tableConfig.getPartitionFields
- val nameFieldMap = generateFieldMap(schema)
-
- if (partitionColumns.isPresent) {
- // Note that key generator class name could be null
- val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
- if
(classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
- ||
classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))
{
- val partitionFields: Array[StructField] =
partitionColumns.get().map(column => StructField(column, StringType))
- StructType(partitionFields)
- } else {
- val partitionFields: Array[StructField] =
partitionColumns.get().filter(column => nameFieldMap.contains(column))
- .map(column => nameFieldMap.apply(column))
-
- if (partitionFields.length != partitionColumns.get().length) {
- val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
- if (isBootstrapTable) {
- // For bootstrapped tables its possible the schema does not
contain partition field when source table
- // is hive style partitioned. In this case we would like to treat
the table as non-partitioned
- // as opposed to failing
- new StructType()
- } else {
- throw new IllegalArgumentException(s"Cannot find columns: " +
- s"'${partitionColumns.get().filter(col =>
!nameFieldMap.contains(col)).mkString(",")}' " +
- s"in the schema[${schema.fields.mkString(",")}]")
- }
- } else {
- new StructType(partitionFields)
- }
- }
- } else {
- // If the partition columns have not stored in hoodie.properties(the
table that was
- // created earlier), we trait it as a non-partitioned table.
- logWarning("No partition columns available from hoodie.properties." +
- " Partition pruning will not work")
- new StructType()
- }
+ lazy val _partitionSchemaFromProperties: StructType = {
+ getPartitionSchema()
+ }
+
+ def getPartitionSchema(): StructType = {
+ sparkParsePartitionUtil.getPartitionSchema(metaClient.getTableConfig,
schema, handleCustomKeyGenerator = false)
Review Comment:
Why is `handleCustomKeyGenerator` set to false?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieReaderFileIndex.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache,
PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+class HoodieReaderFileIndex(override val spark: SparkSession,
+ override val metaClient: HoodieTableMetaClient,
+ override val schemaSpec: Option[StructType],
+ override val options: Map[String, String],
+ @transient override val fileStatusCache:
FileStatusCache = NoopCache,
+ override val includeLogFiles: Boolean = false,
+ override val shouldEmbedFileSlices: Boolean =
false)
+ extends HoodieFileIndex(
+ spark = spark,
+ metaClient = metaClient,
+ schemaSpec = schemaSpec,
+ options = options,
+ fileStatusCache = fileStatusCache,
+ includeLogFiles = includeLogFiles,
+ shouldEmbedFileSlices = shouldEmbedFileSlices) {
+
+ /**
+ *
+ * Returns set of indices with timestamp partition type. For Timestamp based
keygen, there is only one
+ * partition so index is 0. For custom keygen, it is the partition indices
for which partition type is
+ * timestamp.
+ */
+ private def getTimestampPartitionIndex(): Set[Int] = {
+ val tableConfig = metaClient.getTableConfig
+ val keyGeneratorClassNameOpt =
Option.apply(tableConfig.getKeyGeneratorClassName)
+ val recordKeyFieldOpt =
common.util.Option.ofNullable(tableConfig.getRawRecordKeyFieldProp)
+ val keyGeneratorClassName =
keyGeneratorClassNameOpt.getOrElse(KeyGenUtils.inferKeyGeneratorType(recordKeyFieldOpt,
tableConfig.getPartitionFieldProp).getClassName)
+ if (keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
+ ||
keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP_AVRO.getClassName)) {
+ Set(0)
+ } else if
(keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM.getClassName)
+ ||
keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName)) {
+ val partitionFields =
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(java.util.Collections.emptyList[String]())
+ val partitionTypes =
CustomAvroKeyGenerator.getPartitionTypes(partitionFields)
+ var partitionIndexes: Set[Int] = Set.empty
+ for (i <- 0 until partitionTypes.size()) {
+ if (partitionTypes.get(i).equals(PartitionKeyType.TIMESTAMP)) {
+ partitionIndexes = partitionIndexes + i
+ }
+ }
+ partitionIndexes
+ } else {
+ Set.empty
+ }
+ }
+
+ /**
+ * Invoked by Spark to fetch list of latest base files per partition.
+ *
+ * @param partitionFilters partition column filters
+ * @param dataFilters data columns filters
+ * @return list of PartitionDirectory containing partition to base files
mapping
+ */
+ override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ val partitionDirectories = super.listFiles(partitionFilters, dataFilters)
Review Comment:
It first calls `super.listFiles` and then processes the results. Consider
processing the files as they are fetched rather than after the entire list is
retrieved, which could help with memory usage in cases with a large number of
partitions or files.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieReaderFileIndex.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache,
PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+class HoodieReaderFileIndex(override val spark: SparkSession,
+ override val metaClient: HoodieTableMetaClient,
+ override val schemaSpec: Option[StructType],
+ override val options: Map[String, String],
+ @transient override val fileStatusCache:
FileStatusCache = NoopCache,
+ override val includeLogFiles: Boolean = false,
+ override val shouldEmbedFileSlices: Boolean =
false)
+ extends HoodieFileIndex(
+ spark = spark,
+ metaClient = metaClient,
+ schemaSpec = schemaSpec,
+ options = options,
+ fileStatusCache = fileStatusCache,
+ includeLogFiles = includeLogFiles,
+ shouldEmbedFileSlices = shouldEmbedFileSlices) {
+
+ /**
+ *
+ * Returns set of indices with timestamp partition type. For Timestamp based
keygen, there is only one
+ * partition so index is 0. For custom keygen, it is the partition indices
for which partition type is
+ * timestamp.
+ */
+ private def getTimestampPartitionIndex(): Set[Int] = {
+ val tableConfig = metaClient.getTableConfig
+ val keyGeneratorClassNameOpt =
Option.apply(tableConfig.getKeyGeneratorClassName)
+ val recordKeyFieldOpt =
common.util.Option.ofNullable(tableConfig.getRawRecordKeyFieldProp)
+ val keyGeneratorClassName =
keyGeneratorClassNameOpt.getOrElse(KeyGenUtils.inferKeyGeneratorType(recordKeyFieldOpt,
tableConfig.getPartitionFieldProp).getClassName)
+ if (keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
+ ||
keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP_AVRO.getClassName)) {
+ Set(0)
+ } else if
(keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM.getClassName)
+ ||
keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName)) {
+ val partitionFields =
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(java.util.Collections.emptyList[String]())
+ val partitionTypes =
CustomAvroKeyGenerator.getPartitionTypes(partitionFields)
+ var partitionIndexes: Set[Int] = Set.empty
+ for (i <- 0 until partitionTypes.size()) {
+ if (partitionTypes.get(i).equals(PartitionKeyType.TIMESTAMP)) {
+ partitionIndexes = partitionIndexes + i
+ }
+ }
+ partitionIndexes
+ } else {
+ Set.empty
+ }
+ }
+
+ /**
+ * Invoked by Spark to fetch list of latest base files per partition.
+ *
+ * @param partitionFilters partition column filters
+ * @param dataFilters data columns filters
+ * @return list of PartitionDirectory containing partition to base files
mapping
+ */
+ override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
Review Comment:
Is this new index tested anywhere?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala:
##########
@@ -18,17 +18,131 @@
package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, StringType, StructField,
StructType}
import java.util.TimeZone
-trait SparkParsePartitionUtil extends Serializable {
+trait SparkParsePartitionUtil extends Serializable with Logging {
def parsePartition(path: Path,
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone,
validatePartitionValues: Boolean = false): InternalRow
+
+ def getPartitionSchema(tableConfig: HoodieTableConfig, schema: StructType,
handleCustomKeyGenerator: Boolean): StructType = {
Review Comment:
Please add unit tests for util functions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]