yihua commented on code in PR #8397:
URL: https://github.com/apache/hudi/pull/8397#discussion_r1185708463
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -159,27 +151,26 @@ public void
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
* Do schema evolution for ParquetFormat.
*/
public void doEvolutionForParquetFormat() {
- if (internalSchemaOption.isPresent()) {
+ List<String> requiredColumns = getRequireColumn(job);
+ // No need trigger schema evolution for count(*)/count(1) operation
+ boolean disableSchemaEvolution = requiredColumns.isEmpty() ||
(requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
+ if (!disableSchemaEvolution) {
+ if (!internalSchemaOption.isPresent()) {
Review Comment:
Should this condition be `internalSchemaOption == null` since it may not be
initialized?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -159,27 +151,26 @@ public void
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
* Do schema evolution for ParquetFormat.
*/
public void doEvolutionForParquetFormat() {
- if (internalSchemaOption.isPresent()) {
+ List<String> requiredColumns = getRequireColumn(job);
+ // No need trigger schema evolution for count(*)/count(1) operation
+ boolean disableSchemaEvolution = requiredColumns.isEmpty() ||
(requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
+ if (!disableSchemaEvolution) {
+ if (!internalSchemaOption.isPresent()) {
+ internalSchemaOption = new
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
+ }
// reading hoodie schema evolution table
job.setBoolean(HIVE_EVOLUTION_ENABLE, true);
- Path finalPath = ((FileSplit)split).getPath();
+ Path finalPath = ((FileSplit) split).getPath();
InternalSchema prunedSchema;
- List<String> requiredColumns = getRequireColumn(job);
- // No need trigger schema evolution for count(*)/count(1) operation
- boolean disableSchemaEvolution =
- requiredColumns.isEmpty() || (requiredColumns.size() == 1 &&
requiredColumns.get(0).isEmpty());
- if (!disableSchemaEvolution) {
- prunedSchema =
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
requiredColumns);
- InternalSchema querySchema = prunedSchema;
- Long commitTime =
Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
- InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
- InternalSchema mergedInternalSchema = new
InternalSchemaMerger(fileSchema, querySchema, true,
- true).mergeSchema();
- List<Types.Field> fields = mergedInternalSchema.columns();
- setColumnNameList(job, fields);
- setColumnTypeList(job, fields);
- pushDownFilter(job, querySchema, fileSchema);
- }
+ prunedSchema =
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
requiredColumns);
+ InternalSchema querySchema = prunedSchema;
+ Long commitTime =
Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
+ InternalSchema mergedInternalSchema = new
InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema();
+ List<Types.Field> fields = mergedInternalSchema.columns();
+ setColumnNameList(job, fields);
+ setColumnTypeList(job, fields);
+ pushDownFilter(job, querySchema, fileSchema);
Review Comment:
and should this part be guarded by `!internalSchemaOption.isPresent()`?
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -314,9 +314,7 @@ protected HoodieTimeline getActiveTimeline() {
private Object[] parsePartitionColumnValues(String[] partitionColumns,
String partitionPath) {
Object[] partitionColumnValues =
doParsePartitionColumnValues(partitionColumns, partitionPath);
if (shouldListLazily && partitionColumnValues.length !=
partitionColumns.length) {
- throw new HoodieException("Failed to parse partition column values from
the partition-path:"
- + " likely non-encoded slashes being used in partition column's
values. You can try to"
- + " work this around by switching listing mode to eager");
+ LOG.warn(">>> PartitionColumns: " + partitionColumns + "
PartitionValues: " + partitionColumnValues);
Review Comment:
So I assume we still need to fail here instead of printing the warning and
letting it return?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -159,27 +151,26 @@ public void
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
* Do schema evolution for ParquetFormat.
*/
public void doEvolutionForParquetFormat() {
- if (internalSchemaOption.isPresent()) {
+ List<String> requiredColumns = getRequireColumn(job);
+ // No need trigger schema evolution for count(*)/count(1) operation
+ boolean disableSchemaEvolution = requiredColumns.isEmpty() ||
(requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
Review Comment:
I see. This is existing logic. Still wondering the same question.
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -159,27 +151,26 @@ public void
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
* Do schema evolution for ParquetFormat.
*/
public void doEvolutionForParquetFormat() {
- if (internalSchemaOption.isPresent()) {
+ List<String> requiredColumns = getRequireColumn(job);
+ // No need trigger schema evolution for count(*)/count(1) operation
+ boolean disableSchemaEvolution = requiredColumns.isEmpty() ||
(requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
Review Comment:
To clarify, do `requiredColumns` contain the columns from the predicate(s),
e.g., `count(*) where col1 is not null`?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -159,27 +151,26 @@ public void
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
* Do schema evolution for ParquetFormat.
*/
public void doEvolutionForParquetFormat() {
- if (internalSchemaOption.isPresent()) {
+ List<String> requiredColumns = getRequireColumn(job);
+ // No need trigger schema evolution for count(*)/count(1) operation
+ boolean disableSchemaEvolution = requiredColumns.isEmpty() ||
(requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
+ if (!disableSchemaEvolution) {
+ if (!internalSchemaOption.isPresent()) {
+ internalSchemaOption = new
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
Review Comment:
Still do try .. catch .. here in case the internal schema cannot be read?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java:
##########
@@ -58,7 +58,7 @@ public HiveHoodieTableFileIndex(HoodieEngineContext
engineContext,
shouldIncludePendingCommits,
true,
new NoopCache(),
- false);
+ true);
Review Comment:
Now I remember we need to fix the lazy listing for Hvie File Index. Should
this be in a separate PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]