codope commented on code in PR #12714:
URL: https://github.com/apache/hudi/pull/12714#discussion_r1930011703


##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -59,14 +60,38 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          try {
+            if (colsToIndexSchemaMap.isEmpty() || (a.getMinValue() == null || 
b.getMinValue() == null)

Review Comment:
   Also, for my understanding, when can `colsToIndexSchemaMap` be empty?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -59,14 +60,38 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          try {
+            if (colsToIndexSchemaMap.isEmpty() || (a.getMinValue() == null || 
b.getMinValue() == null)
+                || 
(a.getMinValue().getClass().getSimpleName().equals(b.getMinValue().getClass().getSimpleName())
+                && 
a.getMaxValue().getClass().getSimpleName().equals(b.getMaxValue().getClass().getSimpleName())))
 {

Review Comment:
   if the goal is to do proper type check then we can simply do
   ```
   a.getMinValue().getClass().equals(b.getMinValue().getClass()) 
   && a.getMaxValue().getClass().equals(b.getMaxValue().getClass())
   ```
   Trying to avoid `getSimpleName()` for type checks (in theory, different 
classes in different packages can share the same simple name).



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1814,16 +1825,97 @@ private static Comparable<?> coerceToComparable(Schema 
schema, Object val) {
     }
   }
 
-  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+  private static Integer castToInteger(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return (Integer) val;
+    } else if (val instanceof Long) {
+      return ((Long) val).intValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).intValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).intValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1 : 0;
+    }  else {
+      // best effort casting
+      return Integer.parseInt(val.toString());
+    }
+  }
+
+  private static Long castToLong(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).longValue();
+    } else if (val instanceof Long) {
+      return ((Long) val);
+    } else if (val instanceof Float) {
+      return ((Float)val).longValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).longValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1L : 0L;
+    }  else {
+      // best effort casting
+      return Long.parseLong(val.toString());
+    }
+  }
+
+  private static Float castToFloat(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).floatValue();
+    } else if (val instanceof Long) {
+      return ((Long) val).floatValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).floatValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).floatValue();
+    } else if (val instanceof Boolean) {
+      return ((Float) val);
+    }  else {
+      // best effort casting
+      return Float.parseFloat(val.toString());
+    }
+  }
+
+  private static Double castToDouble(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).doubleValue();
+    } else if (val instanceof Long) {
+      return ((Long) val).doubleValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).doubleValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).doubleValue();
+    } else if (val instanceof Boolean) {
+      return ((Double) val);

Review Comment:
   same here.. probably you intended:
   ```
   return (Boolean) val ? 1.0d : 0.0d;
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -59,14 +60,38 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          try {
+            if (colsToIndexSchemaMap.isEmpty() || (a.getMinValue() == null || 
b.getMinValue() == null)
+                || 
(a.getMinValue().getClass().getSimpleName().equals(b.getMinValue().getClass().getSimpleName())
+                && 
a.getMaxValue().getClass().getSimpleName().equals(b.getMaxValue().getClass().getSimpleName())))
 {

Review Comment:
   Would be better if we extract the conditional to a separate boolean method 
like `isSchemaEvolvingForColumn` and add comment for each condition.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -59,14 +60,38 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          try {
+            if (colsToIndexSchemaMap.isEmpty() || (a.getMinValue() == null || 
b.getMinValue() == null)

Review Comment:
   Do we also need to check for `a.getMaxValue() == null || b.getMaxValue() == 
null`?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -59,14 +60,38 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          try {
+            if (colsToIndexSchemaMap.isEmpty() || (a.getMinValue() == null || 
b.getMinValue() == null)
+                || 
(a.getMinValue().getClass().getSimpleName().equals(b.getMinValue().getClass().getSimpleName())
+                && 
a.getMaxValue().getClass().getSimpleName().equals(b.getMaxValue().getClass().getSimpleName())))
 {
+              return HoodieColumnRangeMetadata.merge(a, b);
+            } else {
+              // schema is evolving for the column of interest.
+              Schema schema = colsToIndexSchemaMap.get(a.getColumnName());
+              HoodieColumnRangeMetadata<T> left = 
HoodieColumnRangeMetadata.create(a.getFilePath(), a.getColumnName(),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
a.getMinValue()),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
a.getMaxValue()), a.getNullCount(),
+                  a.getValueCount(), a.getTotalSize(), 
a.getTotalUncompressedSize());
+              HoodieColumnRangeMetadata<T> right = 
HoodieColumnRangeMetadata.create(b.getFilePath(), b.getColumnName(),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
b.getMinValue()),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
b.getMaxValue()), b.getNullCount(),
+                  b.getValueCount(), b.getTotalSize(), 
b.getTotalUncompressedSize());

Review Comment:
   Can we somehow figure out upfront whether schema has evolved for a 
particular column or not? If so, then we can move type-coercion into a single 
place (the mapping step), rather than doing it repeatedly in the reduce. 
Merging then becomes simpler, since everything is already in the correct form.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1814,16 +1825,97 @@ private static Comparable<?> coerceToComparable(Schema 
schema, Object val) {
     }
   }
 
-  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+  private static Integer castToInteger(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return (Integer) val;
+    } else if (val instanceof Long) {
+      return ((Long) val).intValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).intValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).intValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1 : 0;
+    }  else {
+      // best effort casting
+      return Integer.parseInt(val.toString());
+    }
+  }
+
+  private static Long castToLong(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).longValue();
+    } else if (val instanceof Long) {
+      return ((Long) val);
+    } else if (val instanceof Float) {
+      return ((Float)val).longValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).longValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1L : 0L;
+    }  else {
+      // best effort casting
+      return Long.parseLong(val.toString());

Review Comment:
   Do we need to handle `NumberFormatException` gracefully? Same question for 
other types.
   I guess the caller is already from `case LONG` so we shold not see this, but 
just in case..



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java:
##########
@@ -67,7 +68,7 @@ public static void updateColsToIndex(HoodieTable dataTable, 
HoodieWriteConfig co
           if 
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
             // update data table's table config for list of columns indexed.
             List<String> columnsToIndex = 
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, 
dataTable.getMetaClient(), config.getMetadataConfig(),
-                Option.of(config.getRecordMerger().getRecordType()));
+                
Option.of(config.getRecordMerger().getRecordType())).keySet().stream().collect(Collectors.toList());

Review Comment:
   nit: i find it more readable to do `new ArrayList(map.keySet())` instead of 
doing `map.keySet().stream().collect(Collectors.toList())`, especially when 
columns are going to be only a few hundreds and so there isn't any perf diff 
between the two. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1814,16 +1825,97 @@ private static Comparable<?> coerceToComparable(Schema 
schema, Object val) {
     }
   }
 
-  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+  private static Integer castToInteger(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return (Integer) val;
+    } else if (val instanceof Long) {
+      return ((Long) val).intValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).intValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).intValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1 : 0;
+    }  else {
+      // best effort casting
+      return Integer.parseInt(val.toString());
+    }
+  }
+
+  private static Long castToLong(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).longValue();
+    } else if (val instanceof Long) {
+      return ((Long) val);
+    } else if (val instanceof Float) {
+      return ((Float)val).longValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).longValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1L : 0L;
+    }  else {
+      // best effort casting
+      return Long.parseLong(val.toString());
+    }
+  }
+
+  private static Float castToFloat(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).floatValue();
+    } else if (val instanceof Long) {
+      return ((Long) val).floatValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).floatValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).floatValue();
+    } else if (val instanceof Boolean) {
+      return ((Float) val);
+    }  else {
+      // best effort casting
+      return Float.parseFloat(val.toString());
+    }
+  }
+
+  private static Double castToDouble(Object val) {

Review Comment:
   let's make sure we add UTs for these casting methods including corner cases.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -54,7 +54,7 @@ class TestBasicSchemaEvolution extends 
HoodieSparkClientTestBase with ScalaAsser
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
-    HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false" 
// HUDI-8587
+    HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true"

Review Comment:
   Can we add some validation for partition stats read/write in this test? 
Something like:
   1. Start with MOR table having an int field.
   2. Promote to long in an upsert.
   3. Validate partition stats content.
   4. Validate partition pruning using stats.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -59,14 +60,38 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          try {
+            if (colsToIndexSchemaMap.isEmpty() || (a.getMinValue() == null || 
b.getMinValue() == null)
+                || 
(a.getMinValue().getClass().getSimpleName().equals(b.getMinValue().getClass().getSimpleName())
+                && 
a.getMaxValue().getClass().getSimpleName().equals(b.getMaxValue().getClass().getSimpleName())))
 {
+              return HoodieColumnRangeMetadata.merge(a, b);
+            } else {
+              // schema is evolving for the column of interest.
+              Schema schema = colsToIndexSchemaMap.get(a.getColumnName());
+              HoodieColumnRangeMetadata<T> left = 
HoodieColumnRangeMetadata.create(a.getFilePath(), a.getColumnName(),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
a.getMinValue()),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
a.getMaxValue()), a.getNullCount(),
+                  a.getValueCount(), a.getTotalSize(), 
a.getTotalUncompressedSize());
+              HoodieColumnRangeMetadata<T> right = 
HoodieColumnRangeMetadata.create(b.getFilePath(), b.getColumnName(),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
b.getMinValue()),
+                  (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
b.getMaxValue()), b.getNullCount(),
+                  b.getValueCount(), b.getTotalSize(), 
b.getTotalUncompressedSize());
+              return HoodieColumnRangeMetadata.merge(left, right);
+            }
+          } catch (ClassCastException cce) {
+            System.out.println("asdfadsf");
+            throw cce;

Review Comment:
   replace println by LOG if necessary and wrap exception in HoodieException



##########
hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java:
##########
@@ -42,7 +43,7 @@ public void testGetColumnRangeInPartition() {
         "path/to/file2", COLUMN_NAME, 3, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges, 
Collections.emptyMap());

Review Comment:
   Let's UT the method for non-empty map and non-null min/max values where type 
promotion happens and type-coercion path is exercised.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1512,35 +1513,44 @@ public static List<String> 
getColumnsToIndex(HoodieCommitMetadata commitMetadata
   static final String[] META_COLS_TO_ALWAYS_INDEX = 
{COMMIT_TIME_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, 
PARTITION_PATH_METADATA_FIELD};
   @VisibleForTesting
   public static final Set<String> META_COL_SET_TO_INDEX = new 
HashSet<>(Arrays.asList(META_COLS_TO_ALWAYS_INDEX));
+  @VisibleForTesting
+  static final Map<String, Schema> META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP = new 
HashMap<String, Schema>() {{
+      put(COMMIT_TIME_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+      put(RECORD_KEY_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+      put(PARTITION_PATH_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+    }};
 
   @VisibleForTesting
-  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+  public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig 
tableConfig,
                                                HoodieMetadataConfig 
metadataConfig,
                                                Lazy<Option<Schema>> 
tableSchemaLazyOpt,
                                                Option<HoodieRecordType> 
recordType) {
     return getColumnsToIndex(tableConfig, metadataConfig, tableSchemaLazyOpt, 
false, recordType);
   }
 
   @VisibleForTesting
-  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+  public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig 
tableConfig,
                                                HoodieMetadataConfig 
metadataConfig,
                                                Lazy<Option<Schema>> 
tableSchemaLazyOpt,
                                                boolean isTableInitializing) {
     return getColumnsToIndex(tableConfig, metadataConfig, tableSchemaLazyOpt, 
isTableInitializing, Option.empty());
   }
 
   @VisibleForTesting
-  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+  public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig 
tableConfig,
                                                HoodieMetadataConfig 
metadataConfig,
                                                Lazy<Option<Schema>> 
tableSchemaLazyOpt,
                                                boolean isTableInitializing,
                                                Option<HoodieRecordType> 
recordType) {
-    Stream<String> columnsToIndexWithoutRequiredMetas = 
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt, 
isTableInitializing, recordType);
+    Map<String, Schema> columnsToIndexWithoutRequiredMetas = 
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt, 
isTableInitializing, recordType);
     if (!tableConfig.populateMetaFields()) {
-      return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
+      return columnsToIndexWithoutRequiredMetas;
     }
 
-    return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX), 
columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
+    Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();

Review Comment:
   Why are we throwing away `columnsToIndexWithoutRequiredMetas` here? Should 
this be `new LinkedHashMap<>(columnsToIndexWithoutRequiredMetas)` instead?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1814,16 +1825,97 @@ private static Comparable<?> coerceToComparable(Schema 
schema, Object val) {
     }
   }
 
-  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+  private static Integer castToInteger(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return (Integer) val;
+    } else if (val instanceof Long) {
+      return ((Long) val).intValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).intValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).intValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1 : 0;
+    }  else {
+      // best effort casting
+      return Integer.parseInt(val.toString());
+    }
+  }
+
+  private static Long castToLong(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).longValue();
+    } else if (val instanceof Long) {
+      return ((Long) val);
+    } else if (val instanceof Float) {
+      return ((Float)val).longValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).longValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1L : 0L;
+    }  else {
+      // best effort casting
+      return Long.parseLong(val.toString());
+    }
+  }
+
+  private static Float castToFloat(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).floatValue();
+    } else if (val instanceof Long) {
+      return ((Long) val).floatValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).floatValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).floatValue();
+    } else if (val instanceof Boolean) {
+      return ((Float) val);

Review Comment:
   copy-paste err probably? Attempting `(Float) val` when val is `Boolean` will 
throw a `ClassCastException` right. Maybe you intended this:
   ```
   return (Boolean) val ? 1.0f : 0.0f;
   ```



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1814,16 +1825,97 @@ private static Comparable<?> coerceToComparable(Schema 
schema, Object val) {
     }
   }
 
-  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+  private static Integer castToInteger(Object val) {

Review Comment:
   What's the reason for returning `Integer` (boxing/unboxing) and not primitve 
`int`? Is it because you need `null` to represent no value? Same question for 
other types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to