This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f7544e23ac [HUDI-3204] Fixing partition-values being derived from
partition-path instead of source columns (#5364)
f7544e23ac is described below
commit f7544e23ac6899b2d8b28a01c79caad1facd1379
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Apr 20 04:30:27 2022 -0700
[HUDI-3204] Fixing partition-values being derived from partition-path
instead of source columns (#5364)
- Scaffolded `Spark24HoodieParquetFileFormat` extending
`ParquetFileFormat` and overriding the behavior of adding partition columns to
every row
- Amended `SparkAdapter`s `createHoodieParquetFileFormat` API to be able
to configure whether to append partition values or not
- Fallback to append partition values in cases when the source columns are
not persisted in data-file
- Fixing HoodieBaseRelation incorrectly handling mandatory columns
---
.../spark/sql/HoodieCatalystExpressionUtils.scala | 20 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 4 +-
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 112 +++++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 80 +---
.../hudi/common/model/HoodiePartitionMetadata.java | 5 +-
.../hudi/common/table/HoodieTableConfig.java | 2 +-
.../hudi/common/table/TableSchemaResolver.java | 17 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 2 +-
.../hudi/common/table/TestTableSchemaResolver.java | 3 +-
.../utils/HoodieRealtimeRecordReaderUtils.java | 29 +-
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../org/apache/hudi/BaseFileOnlyRelation.scala | 36 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 208 +++++----
.../org/apache/hudi/HoodieDataSourceHelper.scala | 20 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +-
.../org/apache/hudi/IncrementalRelation.scala | 5 +-
.../hudi/MergeOnReadIncrementalRelation.scala | 12 +-
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 13 +-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 3 +
...eFormat.scala => HoodieParquetFileFormat.scala} | 30 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 28 +-
.../hudi/functional/TestCOWDataSourceStorage.scala | 1 -
.../apache/spark/sql/adapter/Spark2Adapter.scala | 6 +-
.../parquet/Spark24HoodieParquetFileFormat.scala | 229 ++++++++++
.../apache/spark/sql/adapter/Spark3_1Adapter.scala | 22 +-
.../parquet/Spark312HoodieParquetFileFormat.scala | 507 +++++++++++----------
.../apache/spark/sql/adapter/Spark3_2Adapter.scala | 13 +-
.../parquet/Spark32HoodieParquetFileFormat.scala | 449 ++++++++++--------
28 files changed, 1166 insertions(+), 696 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index fe30f61b92..a3b9c210b9 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -18,12 +18,30 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, SubqueryExpression}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation,
LogicalPlan}
import org.apache.spark.sql.types.StructType
trait HoodieCatalystExpressionUtils {
+ /**
+ * Generates instance of [[UnsafeProjection]] projecting row of one
[[StructType]] into another [[StructType]]
+ *
+ * NOTE: No safety checks are executed to validate that this projection is
actually feasible,
+ * it's up to the caller to make sure that such projection is possible.
+ *
+ * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is
only possible, if
+ * B is a subset of A
+ */
+ def generateUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
+ val attrs = from.toAttributes
+ val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
+ val targetExprs = to.fields.map(f => attrsMap(f.name))
+
+ GenerateUnsafeProjection.generate(targetExprs, attrs)
+ }
+
/**
* Parses and resolves expression against the attributes of the given table
schema.
*
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index d8ed173547..a97743e62f 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -177,7 +177,7 @@ trait SparkAdapter extends Serializable {
def createResolveHudiAlterTableCommand(sparkSession: SparkSession):
Rule[LogicalPlan]
/**
- * Create hoodie parquet file format.
+ * Create instance of [[ParquetFileFormat]]
*/
- def createHoodieParquetFileFormat(): Option[ParquetFileFormat]
+ def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat]
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
new file mode 100644
index 0000000000..dd14dca671
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+public class AvroSchemaUtils {
+
+ private AvroSchemaUtils() {}
+
+ /**
+ * Appends provided new fields at the end of the given schema
+ *
+ * NOTE: No deduplication is made, this method simply appends fields at the
end of the list
+ * of the source schema as is
+ */
+ public static Schema appendFieldsToSchema(Schema schema, List<Schema.Field>
newFields) {
+ List<Schema.Field> fields = schema.getFields().stream()
+ .map(field -> new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal()))
+ .collect(Collectors.toList());
+ fields.addAll(newFields);
+
+ Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(),
schema.getNamespace(), schema.isError());
+ newSchema.setFields(fields);
+ return newSchema;
+ }
+
+ /**
+ * Passed in {@code Union} schema and will try to resolve the field with the
{@code fieldSchemaFullName}
+ * w/in the union returning its corresponding schema
+ *
+ * @param schema target schema to be inspected
+ * @param fieldSchemaFullName target field-name to be looked up w/in the
union
+ * @return schema of the field w/in the union identified by the {@code
fieldSchemaFullName}
+ */
+ public static Schema resolveUnionSchema(Schema schema, String
fieldSchemaFullName) {
+ if (schema.getType() != Schema.Type.UNION) {
+ return schema;
+ }
+
+ List<Schema> innerTypes = schema.getTypes();
+ Schema nonNullType =
+ innerTypes.stream()
+ .filter(it -> it.getType() != Schema.Type.NULL &&
Objects.equals(it.getFullName(), fieldSchemaFullName))
+ .findFirst()
+ .orElse(null);
+
+ if (nonNullType == null) {
+ throw new AvroRuntimeException(
+ String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
+ }
+
+ return nonNullType;
+ }
+
+ /**
+ * Resolves typical Avro's nullable schema definition: {@code
Union(Schema.Type.NULL, <NonNullType>)},
+ * decomposing union and returning the target non-null type
+ */
+ public static Schema resolveNullableSchema(Schema schema) {
+ if (schema.getType() != Schema.Type.UNION) {
+ return schema;
+ }
+
+ List<Schema> innerTypes = schema.getTypes();
+ Schema nonNullType =
+ innerTypes.stream()
+ .filter(it -> it.getType() != Schema.Type.NULL)
+ .findFirst()
+ .orElse(null);
+
+ if (innerTypes.size() != 2 || nonNullType == null) {
+ throw new AvroRuntimeException(
+ String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
+ }
+
+ return nonNullType;
+ }
+
+ /**
+ * Creates schema following Avro's typical nullable schema definition:
{@code Union(Schema.Type.NULL, <NonNullType>)},
+ * wrapping around provided target non-null type
+ */
+ public static Schema createNullableSchema(Schema.Type avroType) {
+ checkState(avroType != Schema.Type.NULL);
+ return Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(avroType));
+ }
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 9367e23dc6..bf540a302e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -19,7 +19,6 @@
package org.apache.hudi.avro;
import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.SchemaCompatibility;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.JsonProperties;
@@ -27,6 +26,7 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.LogicalTypes.Decimal;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
@@ -64,19 +64,19 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.HashMap;
import java.util.TimeZone;
-import java.util.Iterator;
-
import java.util.stream.Collectors;
import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
/**
* Helper class to do common stuff across Avro.
@@ -97,8 +97,7 @@ public class HoodieAvroUtils {
private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
// All metadata fields are optional strings.
- public static final Schema METADATA_FIELD_SCHEMA =
- Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)));
+ public static final Schema METADATA_FIELD_SCHEMA =
createNullableSchema(Schema.Type.STRING);
public static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
@@ -327,31 +326,6 @@ public class HoodieAvroUtils {
return record;
}
- /**
- * Add null fields to passed in schema. Caller is responsible for ensuring
there is no duplicates. As different query
- * engines have varying constraints regarding treating the case-sensitivity
of fields, its best to let caller
- * determine that.
- *
- * @param schema Passed in schema
- * @param newFieldNames Null Field names to be added
- */
- public static Schema appendNullSchemaFields(Schema schema, List<String>
newFieldNames) {
- List<Field> newFields = new ArrayList<>();
- for (String newField : newFieldNames) {
- newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "",
JsonProperties.NULL_VALUE));
- }
- return createNewSchemaWithExtraFields(schema, newFields);
- }
-
- public static Schema createNewSchemaWithExtraFields(Schema schema,
List<Field> newFields) {
- List<Field> fields = schema.getFields().stream()
- .map(field -> new Field(field.name(), field.schema(), field.doc(),
field.defaultVal())).collect(Collectors.toList());
- fields.addAll(newFields);
- Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(),
schema.getNamespace(), schema.isError());
- newSchema.setFields(fields);
- return newSchema;
- }
-
/**
* Adds the Hoodie commit metadata into the provided Generic Record.
*/
@@ -736,46 +710,6 @@ public class HoodieAvroUtils {
return getRecordColumnValues(record, columns, schema.get(),
consistentLogicalTimestampEnabled);
}
- private static Schema resolveUnionSchema(Schema schema, String
fieldSchemaFullName) {
- if (schema.getType() != Schema.Type.UNION) {
- return schema;
- }
-
- List<Schema> innerTypes = schema.getTypes();
- Schema nonNullType =
- innerTypes.stream()
- .filter(it -> it.getType() != Schema.Type.NULL &&
Objects.equals(it.getFullName(), fieldSchemaFullName))
- .findFirst()
- .orElse(null);
-
- if (nonNullType == null) {
- throw new AvroRuntimeException(
- String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
- }
-
- return nonNullType;
- }
-
- public static Schema resolveNullableSchema(Schema schema) {
- if (schema.getType() != Schema.Type.UNION) {
- return schema;
- }
-
- List<Schema> innerTypes = schema.getTypes();
- Schema nonNullType =
- innerTypes.stream()
- .filter(it -> it.getType() != Schema.Type.NULL)
- .findFirst()
- .orElse(null);
-
- if (innerTypes.size() != 2 || nonNullType == null) {
- throw new AvroRuntimeException(
- String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
- }
-
- return nonNullType;
- }
-
/**
* Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the new schema.
* support deep rewrite for nested record.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 93e9ea5d34..89bad1c33f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -213,7 +213,7 @@ public class HoodiePartitionMetadata {
format = Option.empty();
return true;
} catch (Throwable t) {
- LOG.warn("Unable to read partition meta properties file for partition "
+ partitionPath, t);
+ LOG.debug("Unable to read partition meta properties file for partition "
+ partitionPath);
return false;
}
}
@@ -229,8 +229,7 @@ public class HoodiePartitionMetadata {
format = Option.of(reader.getFormat());
return true;
} catch (Throwable t) {
- // any error, log, check the next base format
- LOG.warn("Unable to read partition metadata " + metafilePath.getName()
+ " for partition " + partitionPath, t);
+ LOG.debug("Unable to read partition metadata " +
metafilePath.getName() + " for partition " + partitionPath);
}
}
return false;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 254044bd28..bbc508bd5f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -607,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
return getString(URL_ENCODE_PARTITIONING);
}
- public Boolean isDropPartitionColumns() {
+ public Boolean shouldDropPartitionColumns() {
return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 262157a8ae..f178a23eee 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -23,11 +23,9 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -47,15 +45,13 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.io.storage.HoodieHFileReader;
-import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.internal.schema.InternalSchema;
import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
-
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -67,6 +63,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
/**
* Helper class to read schema from data files and log files and to convert it
between different formats.
*
@@ -189,7 +188,7 @@ public class TableSchemaResolver {
}
Option<String[]> partitionFieldsOpt =
metaClient.getTableConfig().getPartitionFields();
- if (metaClient.getTableConfig().isDropPartitionColumns()) {
+ if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt,
schema);
}
return schema;
@@ -222,9 +221,9 @@ public class TableSchemaResolver {
List<Field> newFields = new ArrayList<>();
for (String partitionField: partitionFields) {
newFields.add(new Schema.Field(
- partitionField, Schema.create(Schema.Type.STRING), "",
JsonProperties.NULL_VALUE));
+ partitionField, createNullableSchema(Schema.Type.STRING), "",
JsonProperties.NULL_VALUE));
}
- schema = HoodieAvroUtils.createNewSchemaWithExtraFields(schema,
newFields);
+ schema = appendFieldsToSchema(schema, newFields);
}
}
return schema;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3904ff6f83..c0e97f3309 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -89,10 +89,10 @@ import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
-import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 59a24a79f0..e0e57e812b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table;
import org.apache.avro.Schema;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
@@ -57,7 +58,7 @@ public class TestTableSchemaResolver {
assertNotEquals(originSchema, s4);
assertTrue(s4.getFields().stream().anyMatch(f ->
f.name().equals("user_partition")));
Schema.Field f = s4.getField("user_partition");
- assertEquals(f.schema().getType().getName(), "string");
+ assertEquals(f.schema(),
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING));
// case5: user_partition is in originSchema, but partition_path is in
originSchema
String[] pts4 = {"user_partition", "partition_path"};
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 0aa74ef154..0e4f9c304c 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,13 +18,7 @@
package org.apache.hudi.hadoop.utils;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
@@ -32,8 +26,8 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
@@ -46,6 +40,12 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -60,6 +60,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
public class HoodieRealtimeRecordReaderUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
@@ -287,6 +290,14 @@ public class HoodieRealtimeRecordReaderUtils {
List<String> fieldsToAdd =
partitioningFields.stream().map(String::toLowerCase)
.filter(x ->
!firstLevelFieldNames.contains(x)).collect(Collectors.toList());
- return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
+ return appendNullSchemaFields(schema, fieldsToAdd);
+ }
+
+ private static Schema appendNullSchemaFields(Schema schema, List<String>
newFieldNames) {
+ List<Schema.Field> newFields = new ArrayList<>();
+ for (String newField : newFieldNames) {
+ newFields.add(new Schema.Field(newField,
createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
+ }
+ return appendFieldsToSchema(schema, newFields);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cc8fb0492a..556b0feef1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.SparkHoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index adf94fffde..5414a228c7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,14 +20,13 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat,
ParquetFileFormat}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -56,6 +55,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override type FileSplit = HoodieBaseFileSplit
override lazy val mandatoryColumns: Seq[String] =
+ // TODO reconcile, record's key shouldn't be mandatory for base-file only
relation
Seq(recordKeyField)
override def imbueConfigs(sqlContext: SQLContext): Unit = {
@@ -65,14 +65,14 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
- tableSchema: HoodieTableSchema,
+ dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD =
{
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
- tableSchema = tableSchema,
+ dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
+ // We're delegating to Spark to append partition values to every row only
in cases
+ // when these corresponding partition-values are not persisted w/in the
data file itself
+ val shouldAppendPartitionColumns = shouldOmitPartitionColumns
+
val (tableFileFormat, formatClassName) =
metaClient.getTableConfig.getBaseFileFormat match {
- case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+ case HoodieFileFormat.PARQUET =>
+
(sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get,
HoodieParquetFileFormat.FILE_FORMAT_ID)
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
}
if (globPaths.isEmpty) {
+ // NOTE: There are currently 2 ways partition values could be fetched:
+ // - Source columns (producing the values used for physical
partitioning) will be read
+ // from the data file
+ // - Values parsed from the actual partition pat would be
appended to the final dataset
+ //
+ // In the former case, we don't need to provide the
partition-schema to the relation,
+ // therefore we simply stub it w/ empty schema and use full
table-schema as the one being
+ // read from the data file.
+ //
+ // In the latter, we have to specify proper partition schema as
well as "data"-schema, essentially
+ // being a table-schema with all partition columns stripped out
+ val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
+ (fileIndex.partitionSchema, fileIndex.dataSchema)
+ } else {
+ (StructType(Nil), tableStructSchema)
+ }
+
HadoopFsRelation(
location = fileIndex,
- partitionSchema = fileIndex.partitionSchema,
- dataSchema = fileIndex.dataSchema,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
bucketSpec = None,
fileFormat = tableFileFormat,
optParams)(sparkSession)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 53667f3b88..2fd1da5950 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.getPartitionPath
+import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema,
createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{HoodieMetadataConfig,
SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
@@ -36,12 +36,13 @@ import
org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.io.storage.HoodieHFileReader
-import org.apache.spark.TaskContext
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache,
PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -50,11 +51,11 @@ import org.apache.spark.sql.types.{StringType, StructField,
StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
-import java.io.Closeable
import java.net.URI
+import java.util.Locale
import scala.collection.JavaConverters._
-import scala.util.Try
import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
trait HoodieFileSplit {}
@@ -78,7 +79,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
extends BaseRelation
with FileRelation
with PrunedFilteredScan
- with SparkAdapterSupport
with Logging {
type FileSplit <: HoodieFileSplit
@@ -125,14 +125,17 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema)
= {
val schemaUtil = new TableSchemaResolver(metaClient)
- val avroSchema = Try(schemaUtil.getTableAvroSchema).getOrElse(
- // If there is no commit in the table, we can't get the schema
- // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]]
instead.
- userSchema match {
- case Some(s) => sparkAdapter.getAvroSchemaConverters.toAvroType(s,
nullable = false, "record")
- case _ => throw new IllegalArgumentException("User-provided schema is
required in case the table is empty")
- }
- )
+ val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+ case Success(schema) => schema
+ case Failure(e) =>
+ logWarning("Failed to fetch schema from the table", e)
+ // If there is no commit in the table, we can't get the schema
+ // t/h [[TableSchemaResolver]], fallback to the provided
[[userSchema]] instead.
+ userSchema match {
+ case Some(s) => convertToAvroSchema(s)
+ case _ => throw new IllegalArgumentException("User-provided schema
is required in case the table is empty")
+ }
+ }
// try to find internalSchema
val internalSchemaFromMeta = try {
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
@@ -146,11 +149,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
- /**
- * if true, need to deal with schema for creating file reader.
- */
- protected val dropPartitionColumnsWhenWrite: Boolean =
- metaClient.getTableConfig.isDropPartitionColumns &&
partitionColumns.nonEmpty
+ protected val shouldOmitPartitionColumns: Boolean =
+ metaClient.getTableConfig.shouldDropPartitionColumns &&
partitionColumns.nonEmpty
/**
* NOTE: PLEASE READ THIS CAREFULLY
@@ -205,14 +205,19 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
override final def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
- // NOTE: In case list of requested columns doesn't contain the Primary Key
one, we
+ // NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES
+ //
+ // In case list of requested columns doesn't contain the Primary Key
one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing
{@code count()} on Spark's [[Dataset]],
- // Spark still fetches all the rows to execute the query correctly
+ // Spark still fetches all the rows to execute the query
correctly
//
- // It's okay to return columns that have not been requested by the
caller, as those nevertheless will be
- // filtered out upstream
+ // *Appending* additional columns to the ones requested by the
caller is not a problem, as those
+ // will be "projected out" by the caller's projection;
+ //
+ // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS
THIS WILL BREAK THE UPSTREAM
+ // PROJECTION
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
- val partitionSchema = if (dropPartitionColumnsWhenWrite) {
- // when hoodie.datasource.write.drop.partition.columns is true,
partition columns can't be persisted in
- // data files.
- StructType(partitionColumns.map(StructField(_, StringType)))
- } else {
- StructType(Nil)
- }
- val tableSchema = HoodieTableSchema(tableStructSchema, if
(internalSchema.isEmptySchema) tableAvroSchema.toString else
AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString, internalSchema)
- val dataSchema = if (dropPartitionColumnsWhenWrite) {
- val dataStructType = StructType(tableStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
- HoodieTableSchema(
- dataStructType,
- sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType,
nullable = false, "record").toString()
- )
- } else {
- tableSchema
- }
- val requiredSchema = if (dropPartitionColumnsWhenWrite) {
- val requiredStructType = StructType(requiredStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
- HoodieTableSchema(
- requiredStructType,
- sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType,
nullable = false, "record").toString()
- )
+ val tableAvroSchemaStr =
+ if (internalSchema.isEmptySchema) tableAvroSchema.toString
+ else AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString
+
+ val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr,
internalSchema)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString, requiredInternalSchema)
+
+ // Since schema requested by the caller might contain partition columns,
we might need to
+ // prune it, removing all partition columns from it in case these columns
are not persisted
+ // in the data files
+ //
+ // NOTE: This partition schema is only relevant to file reader to be able
to embed
+ // values of partition columns (hereafter referred to as partition
values) encoded into
+ // the partition path, and omitted from the data file, back into
fetched rows;
+ // Note that, by default, partition columns are not omitted
therefore specifying
+ // partition schema for reader is not required
+ val (partitionSchema, dataSchema, prunedRequiredSchema) =
+ tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+ if (fileSplits.isEmpty) {
+ sparkSession.sparkContext.emptyRDD
} else {
- HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString,
requiredInternalSchema)
+ val rdd = composeRDD(fileSplits, partitionSchema, dataSchema,
prunedRequiredSchema, filters)
+
+ // NOTE: In case when partition columns have been pruned from the
required schema, we have to project
+ // the rows from the pruned schema back into the one expected by
the caller
+ val projectedRDD = if (prunedRequiredSchema.structTypeSchema !=
requiredSchema.structTypeSchema) {
+ rdd.mapPartitions { it =>
+ val fullPrunedSchema =
StructType(prunedRequiredSchema.structTypeSchema.fields ++
partitionSchema.fields)
+ val unsafeProjection = generateUnsafeProjection(fullPrunedSchema,
requiredSchema.structTypeSchema)
+ it.map(unsafeProjection)
+ }
+ } else {
+ rdd
+ }
+
+ // Here we rely on a type erasure, to workaround inherited API
restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
+ // Please check [[needConversion]] scala-doc for more details
+ projectedRDD.asInstanceOf[RDD[Row]]
}
- // Here we rely on a type erasure, to workaround inherited API restriction
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
- // Please check [[needConversion]] scala-doc for more details
- if (fileSplits.nonEmpty)
- composeRDD(fileSplits, partitionSchema, dataSchema, requiredSchema,
filters).asInstanceOf[RDD[Row]]
- else
- sparkSession.sparkContext.emptyRDD
}
-
-
/**
* Composes RDD provided file splits to read from, table and partition
schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
- * @param tableSchema target table's schema
+ * @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader
* @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
- tableSchema: HoodieTableSchema,
+ dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD
@@ -325,16 +336,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
}
protected final def appendMandatoryColumns(requestedColumns: Array[String]):
Array[String] = {
- if (dropPartitionColumnsWhenWrite) {
- if (requestedColumns.isEmpty) {
- mandatoryColumns.toArray
- } else {
- requestedColumns
- }
- } else {
- val missing = mandatoryColumns.filter(col =>
!requestedColumns.contains(col))
- requestedColumns ++ missing
- }
+ val missing = mandatoryColumns.filter(col =>
!requestedColumns.contains(col))
+ requestedColumns ++ missing
}
protected def getTableState: HoodieTableState = {
@@ -364,7 +367,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def getPartitionColumnsAsInternalRow(file: FileStatus):
InternalRow = {
try {
val tableConfig = metaClient.getTableConfig
- if (dropPartitionColumnsWhenWrite) {
+ if (shouldOmitPartitionColumns) {
val relativePath = new URI(metaClient.getBasePath).relativize(new
URI(file.getPath.getParent.toString)).toString
val hiveStylePartitioningEnabled =
tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
@@ -388,40 +391,47 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
InternalRow.empty
}
}
-}
-object HoodieBaseRelation {
-
- def getPartitionPath(fileStatus: FileStatus): Path =
- fileStatus.getPath.getParent
+ protected def getColName(f: StructField): String = {
+ if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+ f.name
+ } else {
+ f.name.toLowerCase(Locale.ROOT)
+ }
+ }
/**
* Returns file-reader routine accepting [[PartitionedFile]] and returning
an [[Iterator]]
* over [[InternalRow]]
*/
- def createBaseFileReader(spark: SparkSession,
- partitionSchema: StructType,
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile =>
Iterator[InternalRow] = {
+ protected def createBaseFileReader(spark: SparkSession,
+ partitionSchema: StructType,
+ dataSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
val hfileReader = createHFileReader(
spark = spark,
- tableSchema = tableSchema,
+ dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)
+
+ // We're delegating to Spark to append partition values to every row only
in cases
+ // when these corresponding partition-values are not persisted w/in the
data file itself
+ val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
- dataSchema = tableSchema.structTypeSchema,
+ dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
- hadoopConf = hadoopConf
+ hadoopConf = hadoopConf,
+ appendPartitionValues = shouldAppendPartitionColumns
)
partitionedFile => {
@@ -436,8 +446,38 @@ object HoodieBaseRelation {
}
}
+ private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema):
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+ if (shouldOmitPartitionColumns) {
+ val partitionSchema = StructType(partitionColumns.map(StructField(_,
StringType)))
+ val prunedDataStructSchema =
prunePartitionColumns(tableSchema.structTypeSchema)
+ val prunedRequiredSchema =
prunePartitionColumns(requiredSchema.structTypeSchema)
+
+ (partitionSchema,
+ HoodieTableSchema(prunedDataStructSchema,
convertToAvroSchema(prunedDataStructSchema).toString),
+ HoodieTableSchema(prunedRequiredSchema,
convertToAvroSchema(prunedRequiredSchema).toString))
+ } else {
+ (StructType(Nil), tableSchema, requiredSchema)
+ }
+ }
+
+ private def prunePartitionColumns(dataStructSchema: StructType): StructType =
+ StructType(dataStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
+}
+
+object HoodieBaseRelation extends SparkAdapterSupport {
+
+ private def generateUnsafeProjection(from: StructType, to: StructType) =
+
sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to)
+
+ def convertToAvroSchema(structSchema: StructType): Schema =
+ sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable =
false, "Record")
+
+ def getPartitionPath(fileStatus: FileStatus): Path =
+ fileStatus.getPath.getParent
+
private def createHFileReader(spark: SparkSession,
- tableSchema: HoodieTableSchema,
+ dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 02264bc4a6..1fc9e70a5a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -21,6 +21,7 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.spark.sql.SparkSession
@@ -38,8 +39,8 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
/**
- * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
- * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if
necessary.
+ * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
handling [[ColumnarBatch]],
+ * when Parquet's Vectorized Reader is used
*/
def buildHoodieParquetReader(sparkSession: SparkSession,
dataSchema: StructType,
@@ -47,9 +48,11 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile =>
Iterator[InternalRow] = {
+ hadoopConf: Configuration,
+ appendPartitionValues: Boolean = false):
PartitionedFile => Iterator[InternalRow] = {
- val readParquetFile: PartitionedFile => Iterator[Any] =
sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues(
+ val parquetFileFormat: ParquetFileFormat =
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+ val readParquetFile: PartitionedFile => Iterator[Any] =
parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
dataSchema = dataSchema,
partitionSchema = partitionSchema,
@@ -91,9 +94,12 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
* @param validCommits valid commits, using give validCommits to validate
all legal histroy Schema files, and return the latest one.
*/
def getConfigurationWithInternalSchema(conf: Configuration, internalSchema:
InternalSchema, tablePath: String, validCommits: String): Configuration = {
- conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(internalSchema))
- conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
- conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
+ val querySchemaString = SerDeHelper.toJson(internalSchema)
+ if (!isNullOrEmpty(querySchemaString)) {
+ conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(internalSchema))
+ conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
+ conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
+ }
conf
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c86b1615ba..38062aa802 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -88,7 +88,7 @@ object HoodieSparkSqlWriter {
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig)
val originKeyGeneratorClassName =
HoodieWriterUtils.getOriginKeyGenerator(parameters)
- val timestampKeyGeneratorConfigs =
extractConfigsRelatedToTimestmapBasedKeyGenerator(
+ val timestampKeyGeneratorConfigs =
extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
//validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
@@ -758,7 +758,7 @@ object HoodieSparkSqlWriter {
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
- private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator:
String,
+ private def extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenerator:
String,
params: Map[String, String]): Map[String, String] = {
if
(keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) ||
keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 039dafb596..d9d5812adb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -20,8 +20,8 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat,
HoodieRecord, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
-import java.util.stream.Collectors
+import java.util.stream.Collectors
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -183,7 +184,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH,
metaClient.getBasePath)
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
- case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema)
"HoodieParquet" else "parquet"
+ case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
case HoodieFileFormat.ORC => "orc"
}
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 46e395fc2b..6aa7007851 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -19,9 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path}
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -61,14 +59,14 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
protected override def composeRDD(fileSplits:
Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
- tableSchema: HoodieTableSchema,
+ dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]):
HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
- tableSchema = tableSchema,
- requiredSchema = tableSchema,
+ dataSchema = dataSchema,
+ requiredSchema = dataSchema,
// This file-reader is used to read base file records, subsequently
merging them with the records
// stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
// applying any user-defined filtering _before_ we complete combining
them w/ delta-log records (to make sure that
@@ -86,7 +84,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
- tableSchema = tableSchema,
+ dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters ++ incrementalSpanRecordFilters,
options = optParams,
@@ -99,7 +97,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to
make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to
perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf,
fullSchemaParquetReader, requiredSchemaParquetReader,
- tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
+ dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index d85788e25b..a88eb63036 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,17 +20,14 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
@@ -63,14 +60,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
protected override def composeRDD(fileSplits:
Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
- tableSchema: HoodieTableSchema,
+ dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]):
HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
- tableSchema = tableSchema,
- requiredSchema = tableSchema,
+ dataSchema = dataSchema,
+ requiredSchema = dataSchema,
// This file-reader is used to read base file records, subsequently
merging them with the records
// stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
// applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
@@ -85,7 +82,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
- tableSchema = tableSchema,
+ dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
@@ -96,7 +93,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val tableState = getTableState
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf,
fullSchemaParquetReader, requiredSchemaParquetReader,
- tableSchema, requiredSchema, tableState, mergeType, fileSplits)
+ dataSchema, requiredSchema, tableState, mergeType, fileSplits)
}
protected override def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 1305323bd1..cd1c1fb4af 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -120,6 +120,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
}
+ /**
+ * @VisibleForTesting
+ */
def partitionSchema: StructType = {
if (queryAsNonePartitionedTable) {
// If we read it as Non-Partitioned table, we should not
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
similarity index 58%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
index 150178ea69..dbb62d089e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
@@ -23,26 +23,32 @@ import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-class SparkHoodieParquetFileFormat extends ParquetFileFormat with
SparkAdapterSupport {
- override def shortName(): String = "HoodieParquet"
+class HoodieParquetFileFormat extends ParquetFileFormat with
SparkAdapterSupport {
- override def toString: String = "HoodieParquet"
+ override def shortName(): String = FILE_FORMAT_ID
- override def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ override def toString: String = "Hoodie-Parquet"
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
sparkAdapter
- .createHoodieParquetFileFormat().get
+ .createHoodieParquetFileFormat(appendPartitionValues = false).get
.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema, filters, options, hadoopConf)
}
}
+object HoodieParquetFileFormat {
+
+ val FILE_FORMAT_ID = "hoodie-parquet"
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index b232ef010f..28a6dcdcd6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -747,7 +747,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(resultSchema, schema1)
}
- @ParameterizedTest @ValueSource(booleans = Array(true, false))
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns:
Boolean) {
val records1 =
recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000",
100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -897,9 +898,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
}
- @Disabled("HUDI-3204")
- @Test
- def testHoodieBaseFileOnlyViewRelation(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = {
val _spark = spark
import _spark.implicits._
@@ -925,18 +926,27 @@ class TestCOWDataSource extends HoodieClientTestBase {
.mode(org.apache.spark.sql.SaveMode.Append)
.save(basePath)
- val res = spark.read.format("hudi").load(basePath)
+ // NOTE: We're testing here that both paths are appropriately handling
+ // partition values, regardless of whether we're reading the table
+ // t/h a globbed path or not
+ val path = if (useGlobbing) {
+ s"$basePath/*/*/*/*"
+ } else {
+ basePath
+ }
+
+ val res = spark.read.format("hudi").load(path)
assert(res.count() == 2)
// data_date is the partition field. Persist to the parquet file using the
origin values, and read it.
assertEquals(
- res.select("data_date").map(_.get(0).toString).collect().sorted,
- Array("2018-09-23", "2018-09-24")
+ res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq,
+ Seq("2018-09-23", "2018-09-24")
)
assertEquals(
-
res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted,
- Array("2018/09/23", "2018/09/24")
+
res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq,
+ Seq("2018/09/23", "2018/09/24")
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index bd7edd4db5..48bb46f81b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -57,7 +57,6 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"
- @Disabled("HUDI-3896")
@ParameterizedTest
@CsvSource(Array(
"true,org.apache.hudi.keygen.SimpleKeyGenerator",
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index e4b3c4010a..0e74c997d7 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
@@ -165,7 +165,7 @@ class Spark2Adapter extends SparkAdapter {
}
}
- override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
- Some(new ParquetFileFormat)
+ override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
+ Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
new file mode 100644
index 0000000000..6fb5c50c03
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat,
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{PartitionedFile,
RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 2.4.4 w/ w/
the following changes applied to it:
+ * <ol>
+ * <li>Avoiding appending partition values to the rows read from the data
file</li>
+ * </ol>
+ */
+class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ ParquetWriteSupport.SPARK_ROW_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ SQLConf.SESSION_LOCAL_TIMEZONE.key,
+ sparkSession.sessionState.conf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(
+ SQLConf.CASE_SENSITIVE.key,
+ sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+ ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+ // Sets flags for `ParquetToSparkSchemaConverter`
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sparkSession.sessionState.conf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ // TODO: if you move this into the closure it reverts to the default
values.
+ // If true, enable using the custom RecordReader for parquet. This only
works for
+ // a subset of the types (no complex types).
+ val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
+ val sqlConf = sparkSession.sessionState.conf
+ val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+ val enableVectorizedReader: Boolean =
+ sqlConf.parquetVectorizedReaderEnabled &&
+ resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+ val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
+ val capacity = sqlConf.parquetVectorizedReaderBatchSize
+ val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ val returningBatch = supportBatch(sparkSession, resultSchema)
+ val pushDownDate = sqlConf.parquetFilterPushDownDate
+ val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+ val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+ val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+ val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
+ val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+ (file: PartitionedFile) => {
+ assert(!shouldAppendPartitionValues || file.partitionValues.numFields ==
partitionSchema.size)
+
+ val fileSplit =
+ new FileSplit(new Path(new URI(file.filePath)), file.start,
file.length, Array.empty)
+ val filePath = fileSplit.getPath
+
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ fileSplit.getStart,
+ fileSplit.getStart + fileSplit.getLength,
+ fileSplit.getLength,
+ fileSplit.getLocations,
+ null)
+
+ val sharedConf = broadcastedHadoopConf.value.value
+
+ lazy val footerFileMetaData =
+ ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(pushDownDate,
pushDownTimestamp, pushDownDecimal,
+ pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(parquetSchema, _))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr",
so check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(broadcastedHadoopConf.value.value,
attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull, enableOffHeapColumnVector &&
taskContext.isDefined, capacity)
+ val iter = new RecordReaderIterator(vectorizedReader)
+ // SPARK-23457 Register a task completion lister before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+
+ // NOTE: We're making appending of the partitioned values to the rows
read from the
+ // data file configurable
+ if (shouldAppendPartitionValues) {
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ } else {
+ vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+ }
+
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ logDebug(s"Falling back to parquet-mr")
+ // ParquetRecordReader returns UnsafeRow
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[UnsafeRow](new
ParquetReadSupport(convertTz), parquetFilter)
+ } else {
+ new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+ }
+ val iter = new RecordReaderIterator(reader)
+ // SPARK-23457 Register a task completion lister before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
+ reader.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns =
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+ // This is a horrible erasure hack... if we type the iterator above,
then it actually check
+ // the type in next() and we get a class cast exception. If we make
that function return
+ // Object, then we can defer the cast until later!
+ //
+ // NOTE: We're making appending of the partitioned values to the rows
read from the
+ // data file configurable
+ if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ iter.asInstanceOf[Iterator[InternalRow]]
+ .map(d => appendPartitionColumns(joinedRow(d,
file.partitionValues)))
+ }
+
+ }
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 13dba82488..cd5cd9c82f 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,14 +19,13 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
-import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSchemaConverters, HoodieAvroSerializer,
HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer,
HoodieSparkAvroSchemaConverters}
-import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieSpark3_1CatalystExpressionUtils}
import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer,
HoodieSpark3_1AvroSerializer}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark312HoodieParquetFileFormat}
+import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieSpark3_1CatalystExpressionUtils, SparkSession}
/**
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
}
}
- override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
- if (SPARK_VERSION.startsWith("3.1")) {
- val loadClassName =
"org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
- val clazz = Class.forName(loadClassName, true,
Thread.currentThread().getContextClassLoader)
- val ctor = clazz.getConstructors.head
- Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
- } else {
- None
- }
+ override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
+ Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
index 83b3162bbc..769373866f 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
@@ -17,279 +17,312 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.net.URI
-import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat,
ParquetRecordReader}
-
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import
org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters,
pruneInternalSchema, rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
import org.apache.spark.util.SerializableConfiguration
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
- // reference ParquetFileFormat from spark project
- override def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
- if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
"").isEmpty) {
- // fallback to origin parquet File read
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema, filters, options, hadoopConf)
- } else {
- hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
- hadoopConf.set(
- ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- requiredSchema.json)
- hadoopConf.set(
- ParquetWriteSupport.SPARK_ROW_SCHEMA,
- requiredSchema.json)
- hadoopConf.set(
- SQLConf.SESSION_LOCAL_TIMEZONE.key,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
- hadoopConf.setBoolean(
- SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
- sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
- hadoopConf.setBoolean(
- SQLConf.CASE_SENSITIVE.key,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
- ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
- // Sets flags for `ParquetToSparkSchemaConverter`
- hadoopConf.setBoolean(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.sessionState.conf.isParquetBinaryAsString)
- hadoopConf.setBoolean(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
- // for dataSource v1, we have no method to do project for spark physical
plan.
- // it's safe to do cols project here.
- val internalSchemaString =
hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
- val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
- if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
- val prunedSchema =
SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
querySchemaOption.get())
- hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/
the following changes applied to it:
+ * <ol>
+ * <li>Avoiding appending partition values to the rows read from the data
file</li>
+ * <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ ParquetWriteSupport.SPARK_ROW_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ SQLConf.SESSION_LOCAL_TIMEZONE.key,
+ sparkSession.sessionState.conf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(
+ SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+ sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(
+ SQLConf.CASE_SENSITIVE.key,
+ sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+ ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+ // Sets flags for `ParquetToSparkSchemaConverter`
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sparkSession.sessionState.conf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+ val internalSchemaStr =
hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+ // For Spark DataSource v1, there's no Physical Plan projection/schema
pruning w/in Spark itself,
+ // therefore it's safe to do schema projection here
+ if (!isNullOrEmpty(internalSchemaStr)) {
+ val prunedInternalSchemaStr =
+ pruneInternalSchema(internalSchemaStr, requiredSchema)
+ hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
+ }
+
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ // TODO: if you move this into the closure it reverts to the default
values.
+ // If true, enable using the custom RecordReader for parquet. This only
works for
+ // a subset of the types (no complex types).
+ val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
+ val sqlConf = sparkSession.sessionState.conf
+ val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+ val enableVectorizedReader: Boolean =
+ sqlConf.parquetVectorizedReaderEnabled &&
+ resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+ val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
+ val capacity = sqlConf.parquetVectorizedReaderBatchSize
+ val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ val returningBatch = supportBatch(sparkSession, resultSchema)
+ val pushDownDate = sqlConf.parquetFilterPushDownDate
+ val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+ val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+ val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+ val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
+ val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+ (file: PartitionedFile) => {
+ assert(!shouldAppendPartitionValues || file.partitionValues.numFields ==
partitionSchema.size)
+
+ val filePath = new Path(new URI(file.filePath))
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ file.start,
+ file.start + file.length,
+ file.length,
+ Array.empty,
+ null)
+
+ val sharedConf = broadcastedHadoopConf.value.value
+
+ // Fetch internal schema
+ val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+ // Internal schema has to be pruned at this point
+ val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+ val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
+
+ val tablePath =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+ val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+ val fileSchema = if (shouldUseInternalSchema) {
+ val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+ InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+ } else {
+ null
}
- val broadcastedHadoopConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
-
- // TODO: if you move this into the closure it reverts to the default
values.
- // If true, enable using the custom RecordReader for parquet. This only
works for
- // a subset of the types (no complex types).
- val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
- val sqlConf = sparkSession.sessionState.conf
- val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
- val enableVectorizedReader: Boolean =
- sqlConf.parquetVectorizedReaderEnabled &&
- resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
- val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
- val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
- val capacity = sqlConf.parquetVectorizedReaderBatchSize
- val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
- // Whole stage codegen (PhysicalRDD) is able to deal with batches
directly
- val returningBatch = supportBatch(sparkSession, resultSchema)
- val pushDownDate = sqlConf.parquetFilterPushDownDate
- val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
- val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
- val pushDownStringStartWith =
sqlConf.parquetFilterPushDownStringStartWith
- val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
- val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-
- (file: PartitionedFile) => {
- assert(file.partitionValues.numFields == partitionSchema.size)
- val filePath = new Path(new URI(file.filePath))
- val split =
- new org.apache.parquet.hadoop.ParquetInputSplit(
- filePath,
- file.start,
- file.start + file.length,
- file.length,
- Array.empty,
- null)
- val sharedConf = broadcastedHadoopConf.value.value
- // do deal with internalSchema
- val internalSchemaString =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
- // querySchema must be a pruned schema.
- val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
- val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty ||
!querySchemaOption.isPresent) false else true
- val tablePath =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
- val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
- val fileSchema = if (internalSchemaChangeEnabled) {
- val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+
+ lazy val footerFileMetaData =
+ ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
+ createParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseMode)
} else {
- // this should not happened, searchSchemaAndCache will deal with
correctly.
- null
+ createParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive)
}
+ filters.map(rebuildFilterFromParquet(_, fileSchema,
querySchemaOption.orElse(null)))
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
- lazy val footerFileMetaData =
- ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
- val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
- footerFileMetaData.getKeyValueMetaData.get,
- SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
- // Try to push down filters when filter push-down is enabled.
- val pushed = if (enableParquetFilterPushDown) {
- val parquetSchema = footerFileMetaData.getSchema
- val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
- Spark312HoodieParquetFileFormat.createParquetFilters(
- parquetSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringStartWith,
- pushDownInFilterThreshold,
- isCaseSensitive,
- datetimeRebaseMode)
- } else {
- Spark312HoodieParquetFileFormat.createParquetFilters(
- parquetSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringStartWith,
- pushDownInFilterThreshold,
- isCaseSensitive)
- }
-
filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_,
fileSchema, querySchemaOption.get()))
- // Collects all converted Parquet filter predicates. Notice that
not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
- // is used here.
- .flatMap(parquetFilters.createFilter(_))
- .reduceOption(FilterApi.and)
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr",
so check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
- // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone
conversions to int96 timestamps'
- // *only* if the file was created by something other than
"parquet-mr", so check the actual
- // writer here for this file. We have to do this per-file, as each
file in the table may
- // have different writers.
- // Define isCreatedByParquetMr as function to avoid unnecessary
parquet footer reads.
- def isCreatedByParquetMr: Boolean =
- footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
- val convertTz =
- if (timestampConversion && !isCreatedByParquetMr) {
-
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+
+ // Clone new conf
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
+ var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
new java.util.HashMap()
+ if (shouldUseInternalSchema) {
+ val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
+ val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+ typeChangeInfos =
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(),
mergedInternalSchema)
+ hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
mergedSchema.json)
+ }
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader =
+ if (shouldUseInternalSchema) {
+ new Spark312HoodieVectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseMode.toString,
+ int96RebaseMode.toString,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity,
+ typeChangeInfos)
} else {
- None
+ new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseMode.toString,
+ int96RebaseMode.toString,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
}
- val int96RebaseMode = DataSourceUtils.int96RebaseMode(
- footerFileMetaData.getKeyValueMetaData.get,
-
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
-
- val attemptId = new TaskAttemptID(new TaskID(new JobID(),
TaskType.MAP, 0), 0)
- // use new conf
- val hadoopAttempConf = new
Configuration(broadcastedHadoopConf.value.value)
- //
- // reset request schema
- var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]]
= new java.util.HashMap()
- if (internalSchemaChangeEnabled) {
- val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
- val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
- typeChangeInfos =
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(),
mergedInternalSchema)
- hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
mergedSchema.json)
- }
- val hadoopAttemptContext =
- new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
- // Try to push down filters when filter push-down is enabled.
- // Notice: This push-down is RowGroups level, not individual records.
- if (pushed.isDefined) {
-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
- }
- val taskContext = Option(TaskContext.get())
- if (enableVectorizedReader) {
- val vectorizedReader = new
Spark312HoodieVectorizedParquetRecordReader(
- convertTz.orNull,
- datetimeRebaseMode.toString,
- int96RebaseMode.toString,
- enableOffHeapColumnVector && taskContext.isDefined,
- capacity, typeChangeInfos)
- val iter = new RecordReaderIterator(vectorizedReader)
- // SPARK-23457 Register a task completion listener before
`initialization`.
- taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
- vectorizedReader.initialize(split, hadoopAttemptContext)
+ val iter = new RecordReaderIterator(vectorizedReader)
+ // SPARK-23457 Register a task completion listener before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+
+ // NOTE: We're making appending of the partitioned values to the rows
read from the
+ // data file configurable
+ if (shouldAppendPartitionValues) {
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
- if (returningBatch) {
- vectorizedReader.enableReturningBatches()
- }
+ } else {
+ vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+ }
+
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
- // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
- iter.asInstanceOf[Iterator[InternalRow]]
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ logDebug(s"Falling back to parquet-mr")
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseMode,
+ int96RebaseMode)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
- logDebug(s"Falling back to parquet-mr")
- // ParquetRecordReader returns InternalRow
- val readSupport = new ParquetReadSupport(
- convertTz,
- enableVectorizedReader = false,
- datetimeRebaseMode,
- int96RebaseMode)
- val reader = if (pushed.isDefined && enableRecordFilter) {
- val parquetFilter = FilterCompat.get(pushed.get, null)
- new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
- } else {
- new ParquetRecordReader[InternalRow](readSupport)
- }
- val iter = new RecordReaderIterator[InternalRow](reader)
- // SPARK-23457 Register a task completion listener before
`initialization`.
- taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
- reader.initialize(split, hadoopAttemptContext)
-
- val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
- val unsafeProjection = if (typeChangeInfos.isEmpty) {
- GenerateUnsafeProjection.generate(fullSchema, fullSchema)
- } else {
- // find type changed.
- val newFullSchema = new
StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
- if (typeChangeInfos.containsKey(i)) {
- StructField(f.name, typeChangeInfos.get(i).getRight,
f.nullable, f.metadata)
- } else f
- }).toAttributes ++ partitionSchema.toAttributes
- val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
- if (typeChangeInfos.containsKey(i)) {
- Cast(attr, typeChangeInfos.get(i).getLeft)
- } else attr
- }
- GenerateUnsafeProjection.generate(castSchema, newFullSchema)
- }
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val iter = new RecordReaderIterator[InternalRow](reader)
+ // SPARK-23457 Register a task completion listener before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
+ reader.initialize(split, hadoopAttemptContext)
- if (partitionSchema.length == 0) {
- // There is no partition columns
- iter.map(unsafeProjection)
- } else {
- val joinedRow = new JoinedRow()
- iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val unsafeProjection = if (typeChangeInfos.isEmpty) {
+ GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ } else {
+ // find type changed.
+ val newFullSchema = new
StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
+ if (typeChangeInfos.containsKey(i)) {
+ StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable,
f.metadata)
+ } else f
+ }).toAttributes ++ partitionSchema.toAttributes
+ val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+ if (typeChangeInfos.containsKey(i)) {
+ Cast(attr, typeChangeInfos.get(i).getLeft)
+ } else attr
}
+ GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+ }
+
+ // NOTE: We're making appending of the partitioned values to the rows
read from the
+ // data file configurable
+ if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
@@ -300,6 +333,16 @@ object Spark312HoodieParquetFileFormat {
val PARQUET_FILTERS_CLASS_NAME =
"org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
+ def pruneInternalSchema(internalSchemaStr: String, requiredSchema:
StructType): String = {
+ val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+ if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
+ val prunedSchema =
SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
querySchemaOption.get())
+ SerDeHelper.toJson(prunedSchema)
+ } else {
+ internalSchemaStr
+ }
+ }
+
private def createParquetFilters(arg: Any*): ParquetFilters = {
val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true,
Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
diff --git
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index bad392b4f9..15624c7411 100644
---
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark32HoodieParquetFileFormat}
import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieSpark3_2CatalystExpressionUtils, SparkSession}
@@ -80,14 +80,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
}
}
- override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
- if (SPARK_VERSION.startsWith("3.2")) {
- val loadClassName =
"org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat"
- val clazz = Class.forName(loadClassName, true,
Thread.currentThread().getContextClassLoader)
- val ctor = clazz.getConstructors.head
- Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
- } else {
- None
- }
+ override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
+ Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index 28db473965..f2a0a21df8 100644
---
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.net.URI
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
@@ -27,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID,
TaskID, TaskType}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
@@ -34,226 +33,266 @@ import
org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat,
ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import
org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema,
rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
import org.apache.spark.util.SerializableConfiguration
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
- // reference ParquetFileFormat from spark project
- override def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
- if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
"").isEmpty) {
- // fallback to origin parquet File read
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema, filters, options, hadoopConf)
- } else {
- hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
- hadoopConf.set(
- ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- requiredSchema.json)
- hadoopConf.set(
- ParquetWriteSupport.SPARK_ROW_SCHEMA,
- requiredSchema.json)
- hadoopConf.set(
- SQLConf.SESSION_LOCAL_TIMEZONE.key,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
- hadoopConf.setBoolean(
- SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
- sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
- hadoopConf.setBoolean(
- SQLConf.CASE_SENSITIVE.key,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
- ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
- // Sets flags for `ParquetToSparkSchemaConverter`
- hadoopConf.setBoolean(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.sessionState.conf.isParquetBinaryAsString)
- hadoopConf.setBoolean(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
- // for dataSource v1, we have no method to do project for spark physical
plan.
- // it's safe to do cols project here.
- val internalSchemaString =
hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
- val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
- if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
- val prunedSchema =
SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
querySchemaOption.get())
- hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/
the following changes applied to it:
+ * <ol>
+ * <li>Avoiding appending partition values to the rows read from the data
file</li>
+ * <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ ParquetWriteSupport.SPARK_ROW_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ SQLConf.SESSION_LOCAL_TIMEZONE.key,
+ sparkSession.sessionState.conf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(
+ SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+ sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(
+ SQLConf.CASE_SENSITIVE.key,
+ sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+ ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+ // Sets flags for `ParquetToSparkSchemaConverter`
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sparkSession.sessionState.conf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+ val internalSchemaStr =
hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+ // For Spark DataSource v1, there's no Physical Plan projection/schema
pruning w/in Spark itself,
+ // therefore it's safe to do schema projection here
+ if (!isNullOrEmpty(internalSchemaStr)) {
+ val prunedInternalSchemaStr =
+ pruneInternalSchema(internalSchemaStr, requiredSchema)
+ hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
+ }
+
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ // TODO: if you move this into the closure it reverts to the default
values.
+ // If true, enable using the custom RecordReader for parquet. This only
works for
+ // a subset of the types (no complex types).
+ val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
+ val sqlConf = sparkSession.sessionState.conf
+ val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+ val enableVectorizedReader: Boolean =
+ sqlConf.parquetVectorizedReaderEnabled &&
+ resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+ val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
+ val capacity = sqlConf.parquetVectorizedReaderBatchSize
+ val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ val returningBatch = supportBatch(sparkSession, resultSchema)
+ val pushDownDate = sqlConf.parquetFilterPushDownDate
+ val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+ val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+ val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+ val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
+ val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+ val parquetOptions = new ParquetOptions(options,
sparkSession.sessionState.conf)
+ val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
+ val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+
+ (file: PartitionedFile) => {
+ assert(!shouldAppendPartitionValues || file.partitionValues.numFields ==
partitionSchema.size)
+
+ val filePath = new Path(new URI(file.filePath))
+ val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
+
+ val sharedConf = broadcastedHadoopConf.value.value
+
+ // Fetch internal schema
+ val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+ // Internal schema has to be pruned at this point
+ val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+ val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
+
+ val tablePath =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+ val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+ val fileSchema = if (shouldUseInternalSchema) {
+ val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+ InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+ } else {
+ null
}
- val broadcastedHadoopConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
-
- // TODO: if you move this into the closure it reverts to the default
values.
- // If true, enable using the custom RecordReader for parquet. This only
works for
- // a subset of the types (no complex types).
- val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
- val sqlConf = sparkSession.sessionState.conf
- val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
- val enableVectorizedReader: Boolean =
- sqlConf.parquetVectorizedReaderEnabled &&
- resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
- val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
- val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
- val capacity = sqlConf.parquetVectorizedReaderBatchSize
- val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
- // Whole stage codegen (PhysicalRDD) is able to deal with batches
directly
- val returningBatch = supportBatch(sparkSession, resultSchema)
- val pushDownDate = sqlConf.parquetFilterPushDownDate
- val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
- val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
- val pushDownStringStartWith =
sqlConf.parquetFilterPushDownStringStartWith
- val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
- val isCaseSensitive = sqlConf.caseSensitiveAnalysis
- val parquetOptions = new ParquetOptions(options,
sparkSession.sessionState.conf)
- val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
- val int96RebaseModeInread = parquetOptions.int96RebaseModeInRead
-
- (file: PartitionedFile) => {
- assert(file.partitionValues.numFields == partitionSchema.size)
- val filePath = new Path(new URI(file.filePath))
- val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val sharedConf = broadcastedHadoopConf.value.value
- // do deal with internalSchema
- val internalSchemaString =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
- // querySchema must be a pruned schema.
- val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
- val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty ||
!querySchemaOption.isPresent) false else true
- val tablePath =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
- val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
- val fileSchema = if (internalSchemaChangeEnabled) {
- val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
- } else {
- // this should not happened, searchSchemaAndCache will deal with
correctly.
- null
- }
- lazy val footerFileMetaData =
- ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
- val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
- // Try to push down filters when filter push-down is enabled.
- val pushed = if (enableParquetFilterPushDown) {
- val parquetSchema = footerFileMetaData.getSchema
- val parquetFilters = new ParquetFilters(
- parquetSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringStartWith,
- pushDownInFilterThreshold,
- isCaseSensitive,
- datetimeRebaseSpec)
-
filters.map(Spark32HoodieParquetFileFormat.rebuildFilterFromParquet(_,
fileSchema, querySchemaOption.get()))
- // Collects all converted Parquet filter predicates. Notice that
not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
- // is used here.
- .flatMap(parquetFilters.createFilter(_))
- .reduceOption(FilterApi.and)
+ lazy val footerFileMetaData =
+ ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters.map(rebuildFilterFromParquet(_, fileSchema,
querySchemaOption.orElse(null)))
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr",
so check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
- // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone
conversions to int96 timestamps'
- // *only* if the file was created by something other than
"parquet-mr", so check the actual
- // writer here for this file. We have to do this per-file, as each
file in the table may
- // have different writers.
- // Define isCreatedByParquetMr as function to avoid unnecessary
parquet footer reads.
- def isCreatedByParquetMr: Boolean =
- footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
- val convertTz =
- if (timestampConversion && !isCreatedByParquetMr) {
-
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+
+ // Clone new conf
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
+ var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
new java.util.HashMap()
+ if (shouldUseInternalSchema) {
+ val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
+ val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+ typeChangeInfos =
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(),
mergedInternalSchema)
+ hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
mergedSchema.json)
+ }
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader =
+ if (shouldUseInternalSchema) {
+ new Spark32HoodieVectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity,
+ typeChangeInfos)
} else {
- None
+ new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
}
- val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInread)
-
- val attemptId = new TaskAttemptID(new TaskID(new JobID(),
TaskType.MAP, 0), 0)
- // use new conf
- val hadoopAttempConf = new
Configuration(broadcastedHadoopConf.value.value)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their
listeners. Since listeners
+ // are executed in reverse order of registration, a listener
registered here would close the
+ // iterator while downstream exec nodes are still running. When
off-heap column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
//
- // reset request schema
- var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]]
= new java.util.HashMap()
- if (internalSchemaChangeEnabled) {
- val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
- val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
- typeChangeInfos =
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(),
mergedInternalSchema)
- hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
mergedSchema.json)
- }
- val hadoopAttemptContext =
- new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
+ // Instead, we use FileScanRDD's task completion listener to close
this iterator.
+ val iter = new RecordReaderIterator(vectorizedReader)
+ try {
+ vectorizedReader.initialize(split, hadoopAttemptContext)
- // Try to push down filters when filter push-down is enabled.
- // Notice: This push-down is RowGroups level, not individual records.
- if (pushed.isDefined) {
-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
- }
- val taskContext = Option(TaskContext.get())
- if (enableVectorizedReader) {
- val vectorizedReader = new
Spark32HoodieVectorizedParquetRecordReader(
- convertTz.orNull,
- datetimeRebaseSpec.mode.toString,
- datetimeRebaseSpec.timeZone,
- int96RebaseSpec.mode.toString,
- int96RebaseSpec.timeZone,
- enableOffHeapColumnVector && taskContext.isDefined,
- capacity, typeChangeInfos)
- val iter = new RecordReaderIterator(vectorizedReader)
- // SPARK-23457 Register a task completion listener before
`initialization`.
- // taskContext.foreach(_.addTaskCompletionListener[Unit](_
=> iter.close()))
- try {
- vectorizedReader.initialize(split, hadoopAttemptContext)
+ // NOTE: We're making appending of the partitioned values to the
rows read from the
+ // data file configurable
+ if (shouldAppendPartitionValues) {
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
- if (returningBatch) {
- vectorizedReader.enableReturningBatches()
- }
+ } else {
+ vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+ }
- // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
- iter.asInstanceOf[Iterator[InternalRow]]
- } catch {
- case e: Throwable =>
- // SPARK-23457: In case there is an exception in initialization,
close the iterator to
- // avoid leaking resources.
- iter.close()
- throw e
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
}
+
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ } else {
+ logDebug(s"Falling back to parquet-mr")
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
- logDebug(s"Falling back to parquet-mr")
- // ParquetRecordReader returns InternalRow
- val readSupport = new ParquetReadSupport(
- convertTz,
- enableVectorizedReader = false,
- datetimeRebaseSpec,
- int96RebaseSpec)
- val reader = if (pushed.isDefined && enableRecordFilter) {
- val parquetFilter = FilterCompat.get(pushed.get, null)
- new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
- } else {
- new ParquetRecordReader[InternalRow](readSupport)
- }
- val iter = new RecordReaderIterator[InternalRow](reader)
- // SPARK-23457 Register a task completion listener before
`initialization`.
- taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val iter = new RecordReaderIterator[InternalRow](reader)
+ try {
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
@@ -274,13 +313,21 @@ class Spark32HoodieParquetFileFormat extends
ParquetFileFormat {
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
- if (partitionSchema.length == 0) {
+ // NOTE: We're making appending of the partitioned values to the
rows read from the
+ // data file configurable
+ if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
}
}
}
@@ -289,6 +336,16 @@ class Spark32HoodieParquetFileFormat extends
ParquetFileFormat {
object Spark32HoodieParquetFileFormat {
+ def pruneInternalSchema(internalSchemaStr: String, requiredSchema:
StructType): String = {
+ val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+ if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
+ val prunedSchema =
SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
querySchemaOption.get())
+ SerDeHelper.toJson(prunedSchema)
+ } else {
+ internalSchemaStr
+ }
+ }
+
private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema:
InternalSchema, querySchema: InternalSchema): Filter = {
if (fileSchema == null || querySchema == null) {
oldFilter