This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b6e201  [HUDI-3663] Fixing Column Stats index to properly handle 
first Data Table commit (#5070)
1b6e201 is described below

commit 1b6e201160ef18b0c7d86d1cafd104609489dad7
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sat Mar 19 21:54:13 2022 -0700

    [HUDI-3663] Fixing Column Stats index to properly handle first Data Table 
commit (#5070)
    
    * Fixed metadata conversion util to extract schema from 
`HoodieCommitMetadata`
    
    * Fixed failure to fetch columns to index in empty table
    
    * Abort indexing seq in case there are no columns to index
    
    * Fallback to index at least primary key columns, in case no writer schema 
could be obtained to index all columns
    
    * Fixed `getRecordFields` incorrectly ignoring default value
    
    * Make sure Hudi metadata fields are also indexed
---
 .../hudi/client/functional/TestHoodieIndex.java    |  19 ++-
 .../hudi/common/table/HoodieTableConfig.java       |   8 +-
 .../java/org/apache/hudi/common/util/Option.java   |  16 ++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 142 +++++++++++++--------
 4 files changed, 119 insertions(+), 66 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 876a5d8..8c27e48 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -72,6 +72,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import scala.Tuple2;
@@ -258,19 +259,23 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     // Insert 200 records
-    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, 
newCommitTime);
-    Assertions.assertNoWriteErrors(writeStatues.collect());
-    List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
-    // commit this upsert
-    writeClient.commit(newCommitTime, writeStatues);
+    JavaRDD<WriteStatus> writeStatusesRDD = writeClient.upsert(writeRecords, 
newCommitTime);
+    // NOTE: This will trigger an actual write
+    List<WriteStatus> writeStatuses = writeStatusesRDD.collect();
+    Assertions.assertNoWriteErrors(writeStatuses);
+    // Commit
+    writeClient.commit(newCommitTime, jsc.parallelize(writeStatuses));
+
+    List<String> fileIds = 
writeStatuses.stream().map(WriteStatus::getFileId).collect(Collectors.toList());
+
     HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
 
     // Now tagLocation for these records, hbaseIndex should tag them
     JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords, 
hoodieTable);
-    assert 
(javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 
totalRecords);
+    assertEquals(totalRecords, 
javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
 
     // check tagged records are tagged with correct fileIds
-    assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() 
== null).collect().size() == 0);
+    assertEquals(0, javaRDD.filter(record -> 
record.getCurrentLocation().getFileId() == null).collect().size());
     List<String> taggedFileIds = javaRDD.map(record -> 
record.getCurrentLocation().getFileId()).distinct().collect();
 
     Map<String, String> recordKeyToPartitionPathMap = new HashMap();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 077cc81..bc8a5c4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -461,11 +461,9 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public Option<String[]> getRecordKeyFields() {
-    if (contains(RECORDKEY_FIELDS)) {
-      return Option.of(Arrays.stream(getString(RECORDKEY_FIELDS).split(","))
-          .filter(p -> p.length() > 
0).collect(Collectors.toList()).toArray(new String[] {}));
-    }
-    return Option.empty();
+    String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    return Option.of(Arrays.stream(keyFieldsValue.split(","))
+        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}));
   }
 
   public Option<String[]> getPartitionFields() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java
index 193bf53..3d4bfcb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java
@@ -34,7 +34,7 @@ public final class Option<T> implements Serializable {
 
   private static final long serialVersionUID = 0L;
 
-  private static final Option<?> NULL_VAL = new Option<>();
+  private static final Option<?> EMPTY = new Option<>();
 
   private final T val;
 
@@ -67,8 +67,9 @@ public final class Option<T> implements Serializable {
     this.val = val;
   }
 
+  @SuppressWarnings("unchecked")
   public static <T> Option<T> empty() {
-    return (Option<T>) NULL_VAL;
+    return (Option<T>) EMPTY;
   }
 
   public static <T> Option<T> of(T value) {
@@ -108,6 +109,17 @@ public final class Option<T> implements Serializable {
     }
   }
 
+  public <U> Option<U> flatMap(Function<? super T, Option<U>> mapper) {
+    if (null == mapper) {
+      throw new NullPointerException("mapper should not be null");
+    }
+    if (!isPresent()) {
+      return empty();
+    } else {
+      return Objects.requireNonNull(mapper.apply(val));
+    }
+  }
+
   /**
    * Returns this {@link Option} if not empty, otherwise evaluates the 
provided supplier
    * and returns the alternative
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index f0388cc..4390e87 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -67,6 +68,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -78,6 +80,7 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString;
 import static 
org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION;
 import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX;
@@ -86,6 +89,7 @@ import static 
org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_
 import static 
org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE;
 import static 
org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
 import static 
org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 
@@ -379,15 +383,24 @@ public class HoodieTableMetadataUtil {
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
     });
 
-    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), 
recordsGenerationParams.isAllColumnStatsIndexEnabled());
-    final int parallelism = Math.max(Math.min(deleteFileList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    HoodieData<Pair<String, String>> deleteFileListRDD = 
engineContext.parallelize(deleteFileList, parallelism);
-    return deleteFileListRDD.flatMap(deleteFileInfoPair -> {
-      if 
(deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
 {
-        return getColumnStats(deleteFileInfoPair.getLeft(), 
deleteFileInfoPair.getRight(), recordsGenerationParams.getDataMetaClient(), 
columnsToIndex, true).iterator();
-      }
-      return Collections.emptyListIterator();
-    });
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+
+    List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+        dataTableMetaClient.getTableConfig(), 
tryResolveSchemaForTable(dataTableMetaClient));
+
+    if (columnsToIndex.isEmpty()) {
+      // In case there are no columns to index, bail
+      return engineContext.emptyHoodieData();
+    }
+
+    int parallelism = Math.max(Math.min(deleteFileList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+    return engineContext.parallelize(deleteFileList, parallelism)
+        .flatMap(deleteFileInfoPair -> {
+          if 
(deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
 {
+            return getColumnStats(deleteFileInfoPair.getLeft(), 
deleteFileInfoPair.getRight(), dataTableMetaClient, columnsToIndex, 
true).iterator();
+          }
+          return Collections.emptyListIterator();
+        });
   }
 
   /**
@@ -698,7 +711,15 @@ public class HoodieTableMetadataUtil {
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
                                                                           
MetadataRecordsGenerationParams recordsGenerationParams) {
     HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), 
recordsGenerationParams.isAllColumnStatsIndexEnabled());
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+
+    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams,
+        dataTableMetaClient.getTableConfig(), 
tryResolveSchemaForTable(dataTableMetaClient));
+
+    if (columnsToIndex.isEmpty()) {
+      // In case there are no columns to index, bail
+      return engineContext.emptyHoodieData();
+    }
 
     final List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
         .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
@@ -712,7 +733,7 @@ public class HoodieTableMetadataUtil {
 
       return deletedFileList.stream().flatMap(deletedFile -> {
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        return getColumnStats(partition, filePathWithPartition, 
recordsGenerationParams.getDataMetaClient(), columnsToIndex, true);
+        return getColumnStats(partition, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, true);
       }).iterator();
     });
     allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
@@ -733,7 +754,7 @@ public class HoodieTableMetadataUtil {
           return Stream.empty();
         }
         final String filePathWithPartition = partitionName + "/" + 
appendedFileNameLengthEntry.getKey();
-        return getColumnStats(partition, filePathWithPartition, 
recordsGenerationParams.getDataMetaClient(), columnsToIndex, false);
+        return getColumnStats(partition, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, false);
       }).iterator();
 
     });
@@ -838,55 +859,59 @@ public class HoodieTableMetadataUtil {
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
                                                                              
HoodieEngineContext engineContext,
                                                                              
MetadataRecordsGenerationParams recordsGenerationParams) {
-    try {
-      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
-          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
-      return 
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, 
allWriteStats, recordsGenerationParams);
-    } catch (Exception e) {
-      throw new HoodieException("Failed to generate column stats records for 
metadata table ", e);
-    }
-  }
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
 
-  /**
-   * Create column stats from write status.
-   *
-   * @param engineContext           - Engine context
-   * @param allWriteStats           - Write status to convert
-   * @param recordsGenerationParams - Parameters for columns stats record 
generation
-   */
-  public static HoodieData<HoodieRecord> 
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
-                                                                         
List<HoodieWriteStat> allWriteStats,
-                                                                         
MetadataRecordsGenerationParams recordsGenerationParams) {
     if (allWriteStats.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
-    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), 
recordsGenerationParams.isAllColumnStatsIndexEnabled());
-    final int parallelism = Math.max(Math.min(allWriteStats.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    HoodieData<HoodieWriteStat> allWriteStatsRDD = 
engineContext.parallelize(allWriteStats, parallelism);
-    return allWriteStatsRDD.flatMap(writeStat -> 
translateWriteStatToColumnStats(writeStat, 
recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator());
+
+    try {
+      Option<Schema> writerSchema =
+          
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+              .flatMap(writerSchemaStr ->
+                  isNullOrEmpty(writerSchemaStr)
+                    ? Option.empty()
+                    : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+
+      HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+      HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
+
+      // NOTE: Writer schema added to commit metadata will not contain Hudi's 
metadata fields
+      Option<Schema> tableSchema = writerSchema.map(schema ->
+          tableConfig.populateMetaFields() ? addMetadataFields(schema) : 
schema);
+
+      List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+          tableConfig, tableSchema);
+
+      if (columnsToIndex.isEmpty()) {
+        // In case there are no columns to index, bail
+        return engineContext.emptyHoodieData();
+      }
+
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+      return engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat ->
+              translateWriteStatToColumnStats(writeStat, dataTableMetaClient, 
columnsToIndex).iterator());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table", e);
+    }
   }
 
   /**
    * Get the latest columns for the table for column stats indexing.
-   *
-   * @param datasetMetaClient                   - Data table meta client
-   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
    */
-  private static List<String> getColumnsToIndex(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
-    if (!isMetaIndexColumnStatsForAllColumns
-        || 
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 < 1) {
-      return 
Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(","));
+  private static List<String> 
getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
+                                                HoodieTableConfig tableConfig,
+                                                Option<Schema> 
writerSchemaOpt) {
+    if (recordsGenParams.isAllColumnStatsIndexEnabled() && 
writerSchemaOpt.isPresent()) {
+      return writerSchemaOpt.get().getFields().stream()
+          .map(Schema.Field::name).collect(Collectors.toList());
     }
 
-    TableSchemaResolver schemaResolver = new 
TableSchemaResolver(datasetMetaClient);
-    // consider nested fields as well. if column stats is enabled only for a 
subset of columns,
-    // directly use them instead of all columns from the latest table schema
-    try {
-      return schemaResolver.getTableAvroSchema().getFields().stream()
-          .map(entry -> entry.name()).collect(Collectors.toList());
-    } catch (Exception e) {
-      throw new HoodieException("Failed to get latest columns for " + 
datasetMetaClient.getBasePath());
-    }
+    // In case no writer schema could be obtained we fall back to only index 
primary key
+    // columns
+    return Arrays.asList(tableConfig.getRecordKeyFields().get());
   }
 
   public static HoodieMetadataColumnStats 
mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, 
HoodieMetadataColumnStats newColumnStats) {
@@ -914,7 +939,7 @@ public class HoodieTableMetadataUtil {
       List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
new ArrayList<>(columnRangeMap.values());
       return 
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), 
columnRangeMetadataList, false);
     }
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, columnsToIndex,false);
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, columnsToIndex, false);
   }
 
   private static Stream<HoodieRecord> getColumnStats(final String 
partitionPath, final String filePathWithPartition,
@@ -1023,7 +1048,7 @@ public class HoodieTableMetadataUtil {
       columnStats.put(TOTAL_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
       columnStats.put(TOTAL_UNCOMPRESSED_SIZE, 
Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) 
+ fieldSize);
 
-      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+      if (!isNullOrEmpty(fieldVal)) {
         // set the min value of the field
         if (!columnStats.containsKey(MIN)) {
           columnStats.put(MIN, fieldVal);
@@ -1043,4 +1068,17 @@ public class HoodieTableMetadataUtil {
       }
     });
   }
+
+  private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient 
dataTableMetaClient) {
+    if 
(dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 0) {
+      return Option.empty();
+    }
+
+    TableSchemaResolver schemaResolver = new 
TableSchemaResolver(dataTableMetaClient);
+    try {
+      return Option.of(schemaResolver.getTableAvroSchema());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest columns for " + 
dataTableMetaClient.getBasePath(), e);
+    }
+  }
 }

Reply via email to