This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 034cf019346d [HUDI-9685] Merge row groups for file stitching with 
Parquet API and group by schema (#13674)
034cf019346d is described below

commit 034cf019346d618a7f6ac8c31deff0a73c39c317
Author: Xinli Shang <[email protected]>
AuthorDate: Sun Aug 24 06:27:31 2025 -0700

    [HUDI-9685] Merge row groups for file stitching with Parquet API and group 
by schema (#13674)
    
    * [HUDI-9685] Add schema evolution control feature for binary copy 
operations for file merging
    
    * Address comments: Change to default of disabling schema evolution
    
    * Add a setting method to change the new flag
    
    * Refactor the code
    
    * Change the name of config
    
    * Rename the config name to follow the convention
    
    * retrigger CI
    
    * Fix test error
    
    * Change the config name
    
    * Add sparkStreamCopyClustering strategy
    
    * Remove unused class
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  19 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../org/apache/hudi/io/HoodieBinaryCopyHandle.java |  29 +-
 .../config/TestHoodieWriteConfigFileStitching.java |  73 +++++
 .../TestHoodieBinaryCopyHandleSchemaEvolution.java | 221 ++++++++++++++
 .../TestPartitionAwareClusteringPlanStrategy.java  |   6 +-
 .../SparkStreamCopyClusteringPlanStrategy.java     | 217 ++++++++++++++
 ...SparkStreamCopyClusteringExecutionStrategy.java | 110 +++++++
 .../hudi/io/storage/HoodieFileBinaryCopier.java    |   3 +-
 .../org/apache/hudi/common/util/ParquetUtils.java  |  15 +
 .../parquet/io/HoodieParquetBinaryCopyBase.java    |  68 ++++-
 .../parquet/io/HoodieParquetFileBinaryCopier.java  |   6 +-
 .../apache/hudi/common/util/TestParquetUtils.java  | 105 +++++++
 ...HoodieParquetBinaryCopyBaseSchemaEvolution.java | 324 +++++++++++++++++++++
 .../io/TestHoodieParquetFileBinaryCopier.java      |  17 +-
 .../TestClusteringBinaryCopyStrategy.scala         |   6 +-
 16 files changed, 1193 insertions(+), 30 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index c1e0887e15cd..766b64f0eb1f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -57,8 +57,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
       
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
   public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
       
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
+  public static final String SPARK_STREAM_COPY_CLUSTERING_PLAN_STRATEGY =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkStreamCopyClusteringPlanStrategy";
   public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
       
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
+  public static final String SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkStreamCopyClusteringExecutionStrategy";
   public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
       
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
   public static final String 
SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY =
@@ -343,6 +347,16 @@ public class HoodieClusteringConfig extends HoodieConfig {
           + "Please exercise caution while setting this config, especially 
when clustering is done very frequently. This could lead to race condition in "
           + "rare scenarios, for example, when the clustering completes after 
instants are fetched but before rollback completed.");
 
+  public static final ConfigProperty<Boolean> 
FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE = ConfigProperty
+      .key(CLUSTERING_STRATEGY_PARAM_PREFIX + 
"binary.copy.schema.evolution.enable")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Enable schema evolution support for binary file 
stitching during clustering. "
+          + "When enabled, allows clustering of files with different but 
compatible schemas (e.g., files with added columns). "
+          + "When disabled (default), only files with identical schemas will 
be clustered together, providing better performance "
+          + "but requiring schema consistency across all files in a clustering 
group.");
+
   /**
    * @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead
    */
@@ -611,6 +625,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withFileStitchingBinaryCopySchemaEvolutionEnabled(Boolean 
enabled) {
+      
clusteringConfig.setValue(FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE, 
String.valueOf(enabled));
+      return this;
+    }
+
     public Builder withDataOptimizeStrategy(String strategy) {
       clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1db77f325fc8..dd244f4aefc7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1847,6 +1847,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
getBoolean(HoodieClusteringConfig.ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT);
   }
 
+  public boolean isBinaryCopySchemaEvolutionEnabled() {
+    return 
getBooleanOrDefault(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE);
+  }
+
   public int getInlineClusterMaxCommits() {
     return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
index b8ad1ba797fd..47bd7b0aae42 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
@@ -30,6 +30,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.util.HoodieFileMetadataMerger;
 import org.apache.hudi.io.storage.HoodieFileBinaryCopier;
 import org.apache.hudi.parquet.io.HoodieParquetFileBinaryCopier;
+import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
@@ -63,6 +64,26 @@ public class HoodieBinaryCopyHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I,
   protected long recordsWritten = 0;
   protected long insertRecordsWritten = 0;
 
+  private MessageType getWriteSchema(HoodieWriteConfig config, 
List<StoragePath> inputFiles, Configuration conf, HoodieTable<?, ?, ?, ?> 
table) {
+    if (!config.isBinaryCopySchemaEvolutionEnabled() && !inputFiles.isEmpty()) 
{
+      // When schema evolution is disabled, use the schema from the first 
input file
+      // All files should have the same schema in this case
+      try {
+        ParquetUtils parquetUtils = new ParquetUtils();
+        MessageType fileSchema = parquetUtils.readSchema(table.getStorage(), 
inputFiles.get(0));
+        LOG.info("Binary copy schema evolution disabled. Using schema from 
input file: " + inputFiles.get(0));
+        return fileSchema;
+      } catch (Exception e) {
+        LOG.error("Failed to read schema from input file", e);
+        throw new HoodieIOException("Failed to read schema from input file 
when schema evolution is disabled: " + inputFiles.get(0), 
+            e instanceof IOException ? (IOException) e : new IOException(e));
+      }
+    } else {
+      // Default behavior: use the table's write schema for evolution
+      return new AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
+    }
+  }
+
   public HoodieBinaryCopyHandle(
       HoodieWriteConfig config,
       String instantTime,
@@ -74,7 +95,9 @@ public class HoodieBinaryCopyHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I,
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
     this.inputFiles = inputFilePaths;
     this.conf = hoodieTable.getStorageConf().unwrapAs(Configuration.class);
-    this.writeScheMessageType = new 
AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
+    // When schema evolution is disabled, use the schema from one of the input 
files
+    // Otherwise, use the table's write schema
+    this.writeScheMessageType = getWriteSchema(config, inputFilePaths, conf, 
hoodieTable);
     HoodieFileMetadataMerger fileMetadataMerger = new 
HoodieFileMetadataMerger();
     this.path = makeNewPath(partitionPath);
     writeStatus.setFileId(fileId);
@@ -92,7 +115,9 @@ public class HoodieBinaryCopyHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I,
     HoodieTimer timer = HoodieTimer.start();
     long records = 0;
     try {
-      records = this.writer.binaryCopy(inputFiles, 
Collections.singletonList(path), writeScheMessageType, config.getProps());
+      boolean schemaEvolutionEnabled = 
config.isBinaryCopySchemaEvolutionEnabled();
+      LOG.info("Schema evolution enabled for binary copy: {}", 
schemaEvolutionEnabled);
+      records = this.writer.binaryCopy(inputFiles, 
Collections.singletonList(path), writeScheMessageType, schemaEvolutionEnabled);
     } catch (IOException e) {
       throw new HoodieIOException(e.getMessage(), e);
     } finally {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfigFileStitching.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfigFileStitching.java
new file mode 100644
index 000000000000..09d2381f8fa4
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfigFileStitching.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Tests for file stitching binary copy schema evolution configuration.
+ */
+public class TestHoodieWriteConfigFileStitching {
+
+  @Test
+  public void testFileStitchingBinaryCopySchemaEvolutionConfig() {
+    // Test default value (should be false)
+    HoodieWriteConfig config1 = HoodieWriteConfig.newBuilder()
+        .withPath("/test/path")
+        .build();
+    assertFalse(config1.isBinaryCopySchemaEvolutionEnabled(),
+        "File stitching binary copy schema evolution should be disabled by 
default");
+
+    // Test explicitly setting to false
+    Properties props = new Properties();
+    
props.setProperty(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
 "false");
+    HoodieWriteConfig config2 = HoodieWriteConfig.newBuilder()
+        .withPath("/test/path")
+        .withProps(props)
+        .build();
+    assertFalse(config2.isBinaryCopySchemaEvolutionEnabled(),
+        "File stitching binary copy schema evolution should be disabled when 
explicitly set to false");
+
+    // Test explicitly setting to true
+    
props.setProperty(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
 "true");
+    HoodieWriteConfig config3 = HoodieWriteConfig.newBuilder()
+        .withPath("/test/path")
+        .withProps(props)
+        .build();
+    assertTrue(config3.isBinaryCopySchemaEvolutionEnabled(),
+        "File stitching binary copy schema evolution should be enabled when 
explicitly set to true");
+  }
+
+  @Test
+  public void testClusteringConfigProperty() {
+    // Test that the clustering config property has the correct default value
+    
assertEquals(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
+        
HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
+        "Config key should match expected value");
+    
+    
assertFalse(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.defaultValue(),
+        "Default value should be false to disable schema evolution");
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieBinaryCopyHandleSchemaEvolution.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieBinaryCopyHandleSchemaEvolution.java
new file mode 100644
index 000000000000..62e9269bc1ce
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieBinaryCopyHandleSchemaEvolution.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for HoodieBinaryCopyHandle schema evolution behavior.
+ */
+public class TestHoodieBinaryCopyHandleSchemaEvolution {
+
+  private HoodieWriteConfig config;
+  @Mock
+  private HoodieTable<?, ?, ?, ?> hoodieTable;
+  @Mock
+  private TaskContextSupplier taskContextSupplier;
+  @Mock
+  private HoodieStorage storage;
+  private Configuration hadoopConf;
+
+  private MessageType fileSchema;
+  private MessageType tableSchema;
+  private Schema avroSchema;
+  private List<StoragePath> inputFiles;
+
+  @BeforeEach
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+    
+    // Initialize Hadoop Configuration
+    hadoopConf = new Configuration();
+    
+    // Setup test schemas
+    String avroSchemaStr = 
"{\"type\":\"record\",\"name\":\"TestRecord\",\"fields\":["
+        + "{\"name\":\"field1\",\"type\":\"string\"},"
+        + "{\"name\":\"field2\",\"type\":\"int\"}"
+        + "]}";
+    avroSchema = new Schema.Parser().parse(avroSchemaStr);
+    
+    // Create different schemas for file vs table
+    fileSchema = new AvroSchemaConverter().convert(avroSchema);
+    
+    String tableAvroSchemaStr = 
"{\"type\":\"record\",\"name\":\"TestRecord\",\"fields\":["
+        + "{\"name\":\"field1\",\"type\":\"string\"},"
+        + "{\"name\":\"field2\",\"type\":\"int\"},"
+        + 
"{\"name\":\"field3\",\"type\":[\"null\",\"string\"],\"default\":null}"
+        + "]}";
+    Schema tableAvroSchema = new Schema.Parser().parse(tableAvroSchemaStr);
+    tableSchema = new AvroSchemaConverter().convert(tableAvroSchema);
+    
+    inputFiles = Arrays.asList(new StoragePath("/test/file1.parquet"), new 
StoragePath("/test/file2.parquet"));
+    
+    // Setup mocks
+    when(hoodieTable.getStorage()).thenReturn(storage);
+    
when(hoodieTable.getStorageConf()).thenReturn(mock(org.apache.hudi.storage.StorageConfiguration.class));
+    
when(hoodieTable.getStorageConf().unwrapAs(Configuration.class)).thenReturn(hadoopConf);
+  }
+
+  @Test
+  public void testSchemaEvolutionDisabled_UsesFileSchema() throws Exception {
+    // Given: Schema evolution is disabled
+    config = HoodieWriteConfig.newBuilder()
+        .withPath("/dummy/path")
+        .build();
+    
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
 "false");
+    
+    // Mock ParquetUtils to return file schema
+    try (MockedConstruction<ParquetUtils> parquetUtilsConstruction = 
mockConstruction(ParquetUtils.class, 
+        (mock, context) -> {
+          when(mock.readSchema(eq(storage), 
eq(inputFiles.get(0)))).thenReturn(fileSchema);
+        })) {
+      
+      // When: Creating HoodieBinaryCopyHandle (we can't instantiate directly 
due to complex dependencies,
+      // so we test the getWriteSchema method logic indirectly)
+      TestableHoodieBinaryCopyHandle handle = new 
TestableHoodieBinaryCopyHandle();
+      MessageType result = handle.testGetWriteSchema(config, inputFiles, 
hadoopConf, hoodieTable);
+      
+      // Then: Should use file schema, not table schema
+      assertEquals(fileSchema, result);
+      // Verify that ParquetUtils was constructed and readSchema was called
+      assertEquals(1, parquetUtilsConstruction.constructed().size());
+    }
+  }
+
+  @Test
+  public void testSchemaEvolutionEnabled_UsesTableSchema() throws Exception {
+    // Given: Schema evolution is enabled (default)
+    config = HoodieWriteConfig.newBuilder()
+        .withPath("/dummy/path")
+        .build();
+    
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
 "true");
+    
+    // When: Creating HoodieBinaryCopyHandle
+    TestableHoodieBinaryCopyHandle handle = new 
TestableHoodieBinaryCopyHandle();
+    handle.setWriteSchemaWithMetaFields(avroSchema); // Set the expected table 
schema
+    MessageType result = handle.testGetWriteSchema(config, inputFiles, 
hadoopConf, hoodieTable);
+    
+    // Then: Should use table schema (converted from Avro), not file schema
+    assertNotNull(result);
+    // When evolution is enabled, it should use the table schema directly
+  }
+
+  @Test
+  public void testSchemaEvolutionDisabled_FileReadError_ThrowsException() 
throws Exception {
+    // Given: Schema evolution is disabled but file read fails
+    config = HoodieWriteConfig.newBuilder()
+        .withPath("/dummy/path")
+        .build();
+    
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
 "false");
+    
+    // When: Creating HoodieBinaryCopyHandle with a file that causes read error
+    TestableHoodieBinaryCopyHandle handle = new 
TestableHoodieBinaryCopyHandle();
+    handle.setWriteSchemaWithMetaFields(avroSchema);
+    handle.setSimulateFileReadError(true);
+    
+    // Then: Should throw HoodieIOException
+    assertThrows(HoodieIOException.class, () -> 
+        handle.testGetWriteSchema(config, inputFiles, hadoopConf, hoodieTable)
+    );
+  }
+
+  @Test
+  public void testSchemaEvolutionDisabled_EmptyInputFiles_UsesTableSchema() 
throws Exception {
+    // Given: Schema evolution is disabled but no input files
+    config = HoodieWriteConfig.newBuilder()
+        .withPath("/dummy/path")
+        .build();
+    
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
 "false");
+    
+    // When: Creating HoodieBinaryCopyHandle with empty input files
+    TestableHoodieBinaryCopyHandle handle = new 
TestableHoodieBinaryCopyHandle();
+    handle.setWriteSchemaWithMetaFields(avroSchema);
+    MessageType result = handle.testGetWriteSchema(config, Arrays.asList(), 
hadoopConf, hoodieTable);
+    
+    // Then: Should use table schema, not attempt to read from files
+    assertNotNull(result);
+  }
+
+  /**
+   * Testable subclass that exposes the getWriteSchema method for testing.
+   */
+  private static class TestableHoodieBinaryCopyHandle {
+    private Schema writeSchemaWithMetaFields;
+    private boolean simulateFileReadError = false;
+    
+    public void setWriteSchemaWithMetaFields(Schema schema) {
+      this.writeSchemaWithMetaFields = schema;
+    }
+    
+    public void setSimulateFileReadError(boolean simulateError) {
+      this.simulateFileReadError = simulateError;
+    }
+    
+    public MessageType testGetWriteSchema(HoodieWriteConfig config, 
List<StoragePath> inputFiles, 
+                                         Configuration conf, HoodieTable<?, ?, 
?, ?> table) {
+      if (!config.isBinaryCopySchemaEvolutionEnabled() && 
!inputFiles.isEmpty()) {
+        // When schema evolution is disabled, use the schema from the first 
input file
+        // All files should have the same schema in this case
+        try {
+          if (simulateFileReadError) {
+            throw new IOException("Simulated file read error");
+          }
+          ParquetUtils parquetUtils = new ParquetUtils();
+          MessageType fileSchema = parquetUtils.readSchema(table.getStorage(), 
inputFiles.get(0));
+          return fileSchema;
+        } catch (Exception e) {
+          throw new HoodieIOException("Failed to read schema from input file 
when schema evolution is disabled: " + inputFiles.get(0),
+              e instanceof IOException ? (IOException) e : new IOException(e));
+        }
+      } else {
+        // Default behavior: use the table's write schema for evolution
+        return new 
AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index e5ffda9fc53a..f5b8815658a7 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -26,6 +26,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -41,10 +42,12 @@ public class TestPartitionAwareClusteringPlanStrategy {
   HoodieTable table;
   @Mock
   HoodieEngineContext context;
+  
   HoodieWriteConfig hoodieWriteConfig;
 
   @BeforeEach
   public void setUp() {
+    MockitoAnnotations.openMocks(this);
     Properties props = new Properties();
     
props.setProperty("hoodie.clustering.plan.strategy.partition.regex.pattern", 
"2021072.*");
     this.hoodieWriteConfig = HoodieWriteConfig
@@ -86,4 +89,5 @@ public class TestPartitionAwareClusteringPlanStrategy {
       return null;
     }
   }
-}
+
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkStreamCopyClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkStreamCopyClusteringPlanStrategy.java
new file mode 100644
index 000000000000..2350f4bdc408
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkStreamCopyClusteringPlanStrategy.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
+import org.apache.hudi.util.Lazy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy that uses binary stream copy for execution.
+ * This strategy automatically sets the 
SparkStreamCopyClusteringExecutionStrategy
+ * as the execution strategy when building the clustering plan.
+ */
+public class SparkStreamCopyClusteringPlanStrategy<T>
+    extends SparkSizeBasedClusteringPlanStrategy<T> {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamCopyClusteringPlanStrategy.class);
+  
+  private static final String SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkStreamCopyClusteringExecutionStrategy";
+  
+  public SparkStreamCopyClusteringPlanStrategy(HoodieTable table,
+                                               HoodieEngineContext 
engineContext,
+                                               HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+  
+  @Override
+  protected Map<String, String> getStrategyParams() {
+    Map<String, String> params = new HashMap<>();
+    if 
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+      params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), 
getWriteConfig().getClusteringSortColumns());
+    }
+    // Add any additional parameters specific to stream copy strategy
+    return params;
+  }
+  
+  @Override
+  protected Pair<Stream<HoodieClusteringGroup>, Boolean> 
buildClusteringGroupsForPartition(
+      String partitionPath, List<FileSlice> fileSlices) {
+    
+    // When schema evolution is disabled, use schema-aware grouping
+    // When schema evolution is enabled, use the default size-only grouping
+    if (!getWriteConfig().isBinaryCopySchemaEvolutionEnabled()) {
+      LOG.info("Schema evolution disabled, using schema-aware grouping for 
partition: {}", partitionPath);
+      return 
buildClusteringGroupsForPartitionWithSchemaGrouping(partitionPath, fileSlices);
+    } else {
+      LOG.info("Schema evolution enabled, using size-only grouping for 
partition: {}", partitionPath);
+      return super.buildClusteringGroupsForPartition(partitionPath, 
fileSlices);
+    }
+  }
+  
+  /**
+   * Create Clustering group based on files eligible for clustering with 
schema-aware grouping.
+   * When schema evolution is disabled, files are grouped by schema hash 
first, then by size.
+   * This ensures only files with identical schemas are clustered together.
+   */
+  private Pair<Stream<HoodieClusteringGroup>, Boolean> 
buildClusteringGroupsForPartitionWithSchemaGrouping(
+      String partitionPath, List<FileSlice> fileSlices) {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    
+    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+    List<FileSlice> currentGroup = new ArrayList<>();
+    
+    // Sort fileSlices before dividing
+    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+    sortedFileSlices.sort((o1, o2) -> (int)
+        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
+            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+    
+    long totalSizeSoFar = 0;
+    boolean partialScheduled = false;
+    Integer currentGroupSchemaHash = null; // Track schema hash for current 
group
+    
+    for (FileSlice currentSlice : sortedFileSlices) {
+      long currentSize = currentSlice.getBaseFile().isPresent() 
+          ? currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+      
+      // Get schema hash from the file for schema-based grouping
+      Integer currentFileSchemaHash = getFileSchemaHash(currentSlice);
+      
+      // Check if we need to create a new group due to schema mismatch
+      boolean schemaMismatch = currentGroupSchemaHash != null && 
!currentGroupSchemaHash.equals(currentFileSchemaHash);
+      
+      // Check if max size is reached OR schema is different, and create new 
group if needed
+      if ((totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() || schemaMismatch) 
+          && !currentGroup.isEmpty()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding clustering group - size: {} max bytes: {} num input 
slices: {} output groups: {}{}",
+            totalSizeSoFar, writeConfig.getClusteringMaxBytesInGroup(), 
currentGroup.size(), numOutputGroups,
+            schemaMismatch ? " (schema change detected)" : "");
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+        currentGroup = new ArrayList<>();
+        totalSizeSoFar = 0;
+        currentGroupSchemaHash = null;
+        
+        // If fileSliceGroups size reaches the max group, stop loop
+        if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) 
{
+          LOG.info("Having generated the maximum number of groups: {}", 
writeConfig.getClusteringMaxNumGroups());
+          partialScheduled = true;
+          break;
+        }
+      }
+      
+      // Add to the current file-group
+      currentGroup.add(currentSlice);
+      // Set schema hash for the group if not set
+      if (currentGroupSchemaHash == null) {
+        currentGroupSchemaHash = currentFileSchemaHash;
+      }
+      totalSizeSoFar += currentSize;
+    }
+    
+    if (!currentGroup.isEmpty()) {
+      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding final clustering group - size: {} max bytes: {} num 
input slices: {} output groups: {}",
+            totalSizeSoFar, writeConfig.getClusteringMaxBytesInGroup(), 
currentGroup.size(), numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      }
+    }
+    
+    return Pair.of(fileSliceGroups.stream().map(fileSliceGroup ->
+        HoodieClusteringGroup.newBuilder()
+            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+            .setNumOutputFileGroups(fileSliceGroup.getRight())
+            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+            .build()), partialScheduled);
+  }
+  
+  /**
+   * Get the schema hash for a file slice to enable schema-based grouping.
+   */
+  private Integer getFileSchemaHash(FileSlice fileSlice) {
+    if (fileSlice.getBaseFile().isPresent()) {
+      String filePath = fileSlice.getBaseFile().get().getPath();
+      try {
+        // Use centralized ParquetUtils method to read schema hash
+        return ParquetUtils.readSchemaHash(
+            getHoodieTable().getStorage(),
+            new StoragePath(filePath));
+      } catch (Exception e) {
+        LOG.warn("Failed to read schema hash from file: {}", filePath, e);
+        // Return a default hash if we can't read the schema
+        return 0;
+      }
+    }
+    // Return default hash for files without base file
+    return 0;
+  }
+  
+  @Override
+  public Option<HoodieClusteringPlan> 
generateClusteringPlan(ClusteringPlanActionExecutor executor, 
Lazy<List<String>> partitions) {
+    Option<HoodieClusteringPlan> planOption = 
super.generateClusteringPlan(executor, partitions);
+    
+    if (planOption.isPresent()) {
+      HoodieClusteringPlan plan = planOption.get();
+      
+      // Override the execution strategy to use 
SparkStreamCopyClusteringExecutionStrategy
+      HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
+          
.setStrategyClassName(SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY)
+          .setStrategyParams(getStrategyParams())
+          .build();
+      
+      LOG.info("Setting execution strategy to 
SparkStreamCopyClusteringExecutionStrategy for stream copy clustering");
+      
+      return Option.of(HoodieClusteringPlan.newBuilder()
+          .setStrategy(strategy)
+          .setInputGroups(plan.getInputGroups())
+          .setExtraMetadata(getExtraMetadata())
+          .setVersion(getPlanVersion())
+          .setPreserveHoodieMetadata(true)
+          .setMissingSchedulePartitions(plan.getMissingSchedulePartitions())
+          .build());
+    }
+    
+    return planOption;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkStreamCopyClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkStreamCopyClusteringExecutionStrategy.java
new file mode 100644
index 000000000000..3e7d1d2c1567
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkStreamCopyClusteringExecutionStrategy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.parquet.io.ParquetBinaryCopyChecker;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering execution strategy that uses binary stream copy.
+ * This strategy extends SparkBinaryCopyClusteringExecutionStrategy and 
modifies
+ * the supportBinaryStreamCopy method to skip schema checks when schema 
evolution is disabled.
+ */
+public class SparkStreamCopyClusteringExecutionStrategy<T> 
+    extends SparkBinaryCopyClusteringExecutionStrategy<T> {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamCopyClusteringExecutionStrategy.class);
+  
+  public SparkStreamCopyClusteringExecutionStrategy(HoodieTable table,
+                                                    HoodieEngineContext 
engineContext,
+                                                    HoodieWriteConfig 
writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+  
+  @Override
+  public boolean supportBinaryStreamCopy(List<ClusteringGroupInfo> 
inputGroups, 
+                                        Map<String, String> strategyParams) {
+    // Check if table type is Copy-on-Write
+    if (getHoodieTable().getMetaClient().getTableType() != COPY_ON_WRITE) {
+      LOG.warn("Only support CoW table. Will fall back to common clustering 
execution strategy.");
+      return false;
+    }
+    
+    // Check if sorting is requested (not supported in binary copy)
+    Option<String[]> orderByColumnsOpt = 
+        Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
+            .map(listStr -> listStr.split(","));
+    
+    if (orderByColumnsOpt.isPresent()) {
+      LOG.warn("Not support to sort input records. Will fall back to common 
clustering execution strategy.");
+      return false;
+    }
+    
+    // Check if base file format is Parquet
+    if 
(!getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat().equals(PARQUET))
 {
+      LOG.warn("Binary Copy only support parquet as base file format for 
now.");
+      return false;
+    }
+    
+    // Check if schema evolution is enabled
+    if (!writeConfig.isBinaryCopySchemaEvolutionEnabled()) {
+      // Skip schema checking when schema evolution is disabled
+      LOG.info("Schema evolution disabled, skipping schema compatibility 
checks for binary stream copy");
+      return true;
+    }
+    
+    // Perform schema compatibility checks when schema evolution is enabled
+    LOG.info("Schema evolution enabled, performing schema compatibility checks 
for binary stream copy");
+    JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
+    
+    List<ParquetBinaryCopyChecker.ParquetFileInfo> fileStatus = 
engineContext.parallelize(inputGroups, inputGroups.size())
+        .flatMap(group -> group.getOperations().iterator())
+        .map(op -> {
+          String filePath = op.getDataFilePath();
+          return ParquetBinaryCopyChecker.collectFileInfo(
+              getHoodieTable().getStorageConf().unwrapAs(Configuration.class), 
+              filePath);
+        })
+        .collect();
+    
+    boolean compatible = ParquetBinaryCopyChecker.verifyFiles(fileStatus);
+    if (!compatible) {
+      LOG.warn("Schema compatibility check failed. Will fall back to common 
clustering execution strategy.");
+    }
+    
+    return compatible;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
index b414ad534c5f..690f12ebdd1c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
@@ -24,7 +24,6 @@ import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Properties;
 
 /**
  * HoodieFileBinaryCopier is a high-performance utility designed for efficient 
merging of data files at the binary level.
@@ -34,7 +33,7 @@ import java.util.Properties;
  */
 public interface HoodieFileBinaryCopier {
 
-  long binaryCopy(List<StoragePath> inputFilePaths, List<StoragePath> 
outputFilePath, MessageType writeSchema, Properties props) throws IOException;
+  long binaryCopy(List<StoragePath> inputFilePaths, List<StoragePath> 
outputFilePath, MessageType writeSchema, boolean schemaEvolutionEnabled) throws 
IOException;
 
   void close() throws IOException;
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e47468b54c33..c247945d9e46 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -232,6 +232,21 @@ public class ParquetUtils extends FileFormatUtils {
   public MessageType readSchema(HoodieStorage storage, StoragePath 
parquetFilePath) {
     return readMetadata(storage, 
parquetFilePath).getFileMetaData().getSchema();
   }
+  
+  /**
+   * Get the hash code of the schema from a parquet file.
+   * This is useful for quickly comparing schemas without full comparison.
+   */
+  public static Integer readSchemaHash(HoodieStorage storage, StoragePath 
parquetFilePath) {
+    try {
+      ParquetUtils parquetUtils = new ParquetUtils();
+      MessageType schema = parquetUtils.readSchema(storage, parquetFilePath);
+      return schema.hashCode();
+    } catch (Exception e) {
+      LOG.warn("Failed to read schema hash from file: " + parquetFilePath, e);
+      return 0;
+    }
+  }
 
   @Override
   public Map<String, String> readFooter(HoodieStorage storage, boolean 
required,
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
index 3bbd46a013e3..46640b619e02 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
@@ -120,10 +120,17 @@ public abstract class HoodieParquetBinaryCopyBase 
implements Closeable {
   protected MessageType requiredSchema = null;
 
   protected Configuration conf;
+  
+  // Flag to control schema evolution behavior
+  protected Boolean schemaEvolutionEnabled = null;
 
   public HoodieParquetBinaryCopyBase(Configuration conf) {
     this.conf = conf;
   }
+  
+  public void setSchemaEvolutionEnabled(boolean enabled) {
+    this.schemaEvolutionEnabled = enabled;
+  }
 
   protected void initFileWriter(Path outPutFile, CompressionCodecName 
newCodecName, MessageType schema) {
     try {
@@ -188,9 +195,14 @@ public abstract class HoodieParquetBinaryCopyBase 
implements Closeable {
     for (int i = 0; i < columnsInOrder.size(); i++) {
       ColumnChunkMetaData chunk = columnsInOrder.get(i);
       ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+      if (schemaEvolutionEnabled == null) {
+        throw new HoodieException("The variable 'schemaEvolutionEnabled' is 
supposed to be set in "
+                + "binaryCopy() before calling this method 
processBlocksFromReader");
+      }
 
       // resolve the conflict schema between avro parquet write support and 
spark native parquet write support
-      if (descriptor == null) {
+      // Only attempt legacy conversion if schema evolution is enabled
+      if (descriptor == null && schemaEvolutionEnabled) {
         String[] path = chunk.getPath().toArray();
         path = Arrays.copyOf(path, path.length);
         if (convertLegacy3LevelArray(path) || convertLegacyMap(path)) {
@@ -226,6 +238,14 @@ public abstract class HoodieParquetBinaryCopyBase 
implements Closeable {
       CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
 
       if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+        // Check if this is NOT the FILENAME_METADATA_FIELD and schema 
evolution is disabled
+        if 
(!chunk.getPath().toDotString().equals(HoodieRecord.FILENAME_METADATA_FIELD)) {
+          if (!schemaEvolutionEnabled) {
+            throw new HoodieException("Column masking for '" + 
chunk.getPath().toDotString() 
+                + "' requires schema evolution to be enabled. "
+                + "Set 
'hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable' to 
true.");
+          }
+        }
         // Mask column and compress it again.
         Binary maskValue = maskColumns.get(chunk.getPath());
         if (maskValue != null) {
@@ -261,6 +281,16 @@ public abstract class HoodieParquetBinaryCopyBase 
implements Closeable {
         .stream()
         .filter(c -> !converted.contains(c))
         .collect(Collectors.toList());
+    
+    // If schema evolution is disabled and there are missing columns, throw an 
exception
+    if (!schemaEvolutionEnabled && !missedColumns.isEmpty()) {
+      String missingColumnsStr = missedColumns.stream()
+          .map(c -> String.join(".", c.getPath()))
+          .collect(Collectors.joining(", "));
+      throw new HoodieException("Schema evolution is disabled but found 
missing columns in input file: " 
+          + missingColumnsStr + ". All input files must have the same schema 
when schema evolution is disabled.");
+    }
+    
     for (ColumnDescriptor descriptor : missedColumns) {
       addNullColumn(
           descriptor,
@@ -644,18 +674,32 @@ public abstract class HoodieParquetBinaryCopyBase 
implements Closeable {
   @VisibleForTesting
   public boolean convertLegacy3LevelArray(String[] path) {
     boolean changed = false;
-    for (int i = 0; i < path.length; i++) {
-      try {
-        String[] subPath = Arrays.copyOf(path, i + 1);
-        Type type = this.requiredSchema.getType(subPath);
-        if (type.getOriginalType() == LIST && "bag".equals(path[i + 1])) {
-          // Convert from xxx.bag.array to xxx.list.element
-          path[i + 1] = "list";
-          path[i + 2] = "element";
-          changed = true;
+    
+    // For schema evolution, also check for legacy patterns even if the field 
doesn't exist in the schema
+    for (int i = 0; i < path.length - 2; i++) {
+      if ("bag".equals(path[i + 1]) && "array_element".equals(path[i + 2])) {
+        // Convert from xxx.bag.array_element to xxx.list.element
+        path[i + 1] = "list";
+        path[i + 2] = "element";
+        changed = true;
+        break;
+      }
+    }
+    
+    if (!changed) {
+      for (int i = 0; i < path.length; i++) {
+        try {
+          String[] subPath = Arrays.copyOf(path, i + 1);
+          Type type = this.requiredSchema.getType(subPath);
+          if (type.getOriginalType() == LIST && i + 1 < path.length && 
"bag".equals(path[i + 1])) {
+            // Convert from xxx.bag.array to xxx.list.element
+            path[i + 1] = "list";
+            path[i + 2] = "element";
+            changed = true;
+          }
+        } catch (InvalidRecordException e) {
+          LOG.debug("field not found due to schema evolution, nothing need to 
do");
         }
-      } catch (InvalidRecordException e) {
-        LOG.debug("field not found due to schema evolution, nothing need to 
do");
       }
     }
     return changed;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
index 140cfd437d73..7ca88182d83d 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
@@ -41,7 +41,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Queue;
 import java.util.Set;
 
@@ -99,7 +98,10 @@ public class HoodieParquetFileBinaryCopier extends 
HoodieParquetBinaryCopyBase i
   public long binaryCopy(List<StoragePath> inputFilePaths,
                          List<StoragePath> outputFilePath,
                          MessageType writeSchema,
-                         Properties props) throws IOException {
+                         boolean schemaEvolutionEnabled) throws IOException {
+    // Set schema evolution enabled flag
+    setSchemaEvolutionEnabled(schemaEvolutionEnabled);
+    
     openInputFiles(inputFilePaths, conf);
     initFileWriter(new Path(outputFilePath.get(0).toUri()), codecName, 
writeSchema);
     initNextReader();
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index f86ae394b10e..17e80ab63f95 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -397,4 +398,108 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
       return Arrays.asList(new String[]{partitionField});
     }
   }
+
+  @Test
+  public void testReadSchemaHash() throws Exception {
+    // Given: Create a parquet file with a specific schema
+    List<String> rowKeys = Arrays.asList("row1", "row2", "row3");
+    String filePath = Paths.get(basePath, 
"test_schema_hash.parquet").toUri().toString();
+    writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
+    
+    StoragePath storagePath = new StoragePath(filePath);
+    
+    // When: Reading schema hash
+    Integer schemaHash = 
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath), storagePath);
+    
+    // Then: Should return a valid hash
+    assertTrue(schemaHash != null, "Schema hash should not be null");
+    assertTrue(schemaHash != 0, "Schema hash should not be zero (default error 
value)");
+    
+    // Verify consistency - reading same file should return same hash
+    Integer secondRead = 
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath), storagePath);
+    assertEquals(schemaHash, secondRead, "Schema hash should be consistent 
across reads");
+  }
+
+  @Test
+  public void testReadSchemaHash_DifferentSchemas() throws Exception {
+    // Given: Create two parquet files with different schemas
+    List<String> rowKeys = Arrays.asList("row1", "row2");
+    
+    // File 1 with original schema
+    String filePath1 = Paths.get(basePath, 
"test_schema1.parquet").toUri().toString();
+    writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath1, rowKeys);
+    
+    // File 2 with extended schema (add a field)
+    String filePath2 = Paths.get(basePath, 
"test_schema2.parquet").toUri().toString();
+    writeParquetFileWithExtendedSchema(filePath2, rowKeys);
+    
+    // When: Reading schema hashes from both files
+    Integer hash1 = 
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath1), new 
StoragePath(filePath1));
+    Integer hash2 = 
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath2), new 
StoragePath(filePath2));
+    
+    // Then: Should have different hashes for different schemas
+    assertTrue(hash1 != null && hash2 != null, "Both schema hashes should be 
valid");
+    assertTrue(!hash1.equals(hash2), "Different schemas should have different 
hash codes");
+  }
+
+  @Test
+  public void testReadSchemaHash_NonExistentFile() throws Exception {
+    // Given: Non-existent file path
+    StoragePath nonExistentPath = new 
StoragePath("/non/existent/file.parquet");
+    
+    // When: Reading schema hash from non-existent file
+    Integer schemaHash = 
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(basePath), 
nonExistentPath);
+    
+    // Then: Should return 0 (error default value)
+    assertEquals(Integer.valueOf(0), schemaHash, "Non-existent file should 
return default error value 0");
+  }
+
+  @Test
+  public void testReadSchemaHash_MatchesDirectSchemaRead() throws Exception {
+    // Given: Create a parquet file
+    List<String> rowKeys = Arrays.asList("row1", "row2", "row3");
+    String filePath = Paths.get(basePath, 
"test_direct_schema.parquet").toUri().toString();
+    writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
+    
+    StoragePath storagePath = new StoragePath(filePath);
+    
+    // When: Reading schema hash vs direct schema read
+    Integer schemaHashFromUtils = 
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath), storagePath);
+    MessageType directSchema = 
parquetUtils.readSchema(HoodieTestUtils.getStorage(filePath), storagePath);
+    Integer directSchemaHash = directSchema.hashCode();
+    
+    // Then: Hash from utility method should match direct schema hash
+    assertEquals(directSchemaHash, schemaHashFromUtils, 
+        "Schema hash from utility should match direct schema.hashCode()");
+  }
+
+  private void writeParquetFileWithExtendedSchema(String filePath, 
List<String> rowKeys) throws Exception {
+    // Create an extended schema with an additional field
+    Schema extendedSchema = Schema.createRecord("record", "", "", false);
+    List<Schema.Field> fields = new ArrayList<>();
+    fields.add(new Schema.Field("_row_key", Schema.create(Schema.Type.STRING), 
"", (Object) null));
+    fields.add(new Schema.Field("time", Schema.create(Schema.Type.LONG), "", 
(Object) null));
+    fields.add(new Schema.Field("number", Schema.create(Schema.Type.LONG), "", 
(Object) null));
+    fields.add(new Schema.Field("extra_field", 
createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); // 
Additional field
+    extendedSchema.setFields(fields);
+
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 
-1, BloomFilterTypeCode.SIMPLE.name());
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(extendedSchema), extendedSchema, 
Option.of(filter), new Properties());
+
+    ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, 
CompressionCodecName.GZIP,
+        120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
+
+    for (String rowKey : rowKeys) {
+      GenericRecord record = new GenericData.Record(extendedSchema);
+      record.put("_row_key", rowKey);
+      record.put("time", 1234567L);
+      record.put("number", 12345L);
+      record.put("extra_field", "extra_value"); // Set the extra field
+      writer.write(record);
+      writeSupport.add(rowKey);
+    }
+    writer.close();
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetBinaryCopyBaseSchemaEvolution.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetBinaryCopyBaseSchemaEvolution.java
new file mode 100644
index 000000000000..e7e1ca7d442d
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetBinaryCopyBaseSchemaEvolution.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for HoodieParquetBinaryCopyBase schema evolution behavior.
+ */
+public class TestHoodieParquetBinaryCopyBaseSchemaEvolution {
+
+  @Mock
+  private CompressionConverter.TransParquetFileReader reader;
+  @Mock
+  private ParquetMetadata parquetMetadata;
+  @Mock
+  private FileMetaData fileMetaData;
+  @Mock
+  private BlockMetaData blockMetaData;
+  @Mock
+  private ColumnChunkMetaData columnChunkMetaData;
+  @Mock
+  private EncodingStats encodingStats;
+
+  private TestableHoodieParquetBinaryCopyBase copyBase;
+  private MessageType requiredSchema;
+  private MessageType fileSchema;
+
+  @BeforeEach
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+    
+    // Setup test schemas
+    requiredSchema = Types.buildMessage()
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
PrimitiveType.Repetition.REQUIRED)
+            .named("field1"))
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
PrimitiveType.Repetition.OPTIONAL)
+            .named("field2"))
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
PrimitiveType.Repetition.OPTIONAL)
+            .named("field3"))
+        .named("TestRecord");
+    
+    // File schema missing field3
+    fileSchema = Types.buildMessage()
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
PrimitiveType.Repetition.REQUIRED)
+            .named("field1"))
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
PrimitiveType.Repetition.OPTIONAL)
+            .named("field2"))
+        .named("TestRecord");
+    
+    copyBase = spy(new TestableHoodieParquetBinaryCopyBase(new 
Configuration()));
+    copyBase.requiredSchema = requiredSchema;
+    
+    // Setup mocks
+    when(reader.getFooter()).thenReturn(parquetMetadata);
+    when(parquetMetadata.getFileMetaData()).thenReturn(fileMetaData);
+    when(fileMetaData.getSchema()).thenReturn(fileSchema);
+    when(columnChunkMetaData.getEncodingStats()).thenReturn(encodingStats);
+    
+    // Mock the addNullColumn method to avoid complex setup
+    doNothing().when(copyBase).addNullColumn(any(ColumnDescriptor.class), 
anyLong(), any(EncodingStats.class), 
+        any(), any(MessageType.class), any(CompressionCodecName.class));
+  }
+
+  @Test
+  public void testSchemaEvolutionEnabled_AllowsMissingColumns() throws 
Exception {
+    // Given: Schema evolution is enabled (default)
+    copyBase.setSchemaEvolutionEnabled(true);
+    
+    List<ColumnChunkMetaData> columnsInOrder = 
Arrays.asList(columnChunkMetaData);
+    when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+    
+    // Setup mock to simulate processing logic without the complex 
ParquetFileWriter setup
+    copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+    
+    // When: Processing blocks with missing columns
+    // Then: Should not throw exception
+    assertDoesNotThrow(() -> {
+      copyBase.testProcessMissedColumns();
+    });
+    
+    // Should call addNullColumn for missing field3
+    verify(copyBase, times(1)).addNullColumn(any(ColumnDescriptor.class), 
anyLong(), 
+        any(EncodingStats.class), any(), eq(requiredSchema), 
any(CompressionCodecName.class));
+  }
+
+  @Test
+  public void testSchemaEvolutionDisabled_ThrowsExceptionOnMissingColumns() 
throws Exception {
+    // Given: Schema evolution is disabled
+    copyBase.setSchemaEvolutionEnabled(false);
+    
+    List<ColumnChunkMetaData> columnsInOrder = 
Arrays.asList(columnChunkMetaData);
+    when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+    
+    copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+    
+    // When: Processing blocks with missing columns
+    // Then: Should throw HoodieException
+    HoodieException exception = assertThrows(HoodieException.class, () -> {
+      copyBase.testProcessMissedColumns();
+    });
+    
+    // Verify exception message contains information about missing columns
+    assertEquals("Schema evolution is disabled but found missing columns in 
input file: field3. "
+        + "All input files must have the same schema when schema evolution is 
disabled.", exception.getMessage());
+    
+    // Should not call addNullColumn when evolution is disabled
+    verify(copyBase, never()).addNullColumn(any(ColumnDescriptor.class), 
anyLong(), 
+        any(EncodingStats.class), any(), any(MessageType.class), 
any(CompressionCodecName.class));
+  }
+
+  @Test
+  public void testSchemaEvolutionDisabled_NoMissingColumns_DoesNotThrow() 
throws Exception {
+    // Given: Schema evolution is disabled, but file schema matches required 
schema
+    copyBase.setSchemaEvolutionEnabled(false);
+    
+    // File schema with all required fields
+    MessageType completeFileSchema = Types.buildMessage()
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
PrimitiveType.Repetition.REQUIRED)
+            .named("field1"))
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
PrimitiveType.Repetition.OPTIONAL)
+            .named("field2"))
+        .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
PrimitiveType.Repetition.OPTIONAL)
+            .named("field3"))
+        .named("TestRecord");
+    
+    when(fileMetaData.getSchema()).thenReturn(completeFileSchema);
+    
+    List<ColumnChunkMetaData> columnsInOrder = 
Arrays.asList(columnChunkMetaData);
+    when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+    
+    copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+    
+    // When: Processing blocks with no missing columns
+    // Then: Should not throw exception
+    assertDoesNotThrow(() -> {
+      copyBase.testProcessMissedColumns();
+    });
+    
+    // Should not call addNullColumn when no columns are missing
+    verify(copyBase, never()).addNullColumn(any(ColumnDescriptor.class), 
anyLong(), 
+        any(EncodingStats.class), any(), any(MessageType.class), 
any(CompressionCodecName.class));
+  }
+
+  @Test
+  public void testSchemaEvolutionDisabled_SkipsLegacyConversion() throws 
Exception {
+    // Given: Schema evolution is disabled
+    copyBase.setSchemaEvolutionEnabled(false);
+    
+    // Setup a column that would normally trigger legacy conversion
+    ColumnChunkMetaData legacyColumn = mock(ColumnChunkMetaData.class);
+    ColumnPath legacyPath = 
ColumnPath.fromDotString("testArray.bag.array_element");
+    when(legacyColumn.getPath()).thenReturn(legacyPath);
+    
+    List<ColumnChunkMetaData> columnsInOrder = Arrays.asList(legacyColumn);
+    when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+    
+    // Create descriptors map that doesn't contain the legacy path (simulating 
descriptor == null case)
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = new HashMap<>();
+    
+    copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+    
+    // When: Processing columns with legacy paths
+    boolean legacyConversionAttempted = 
copyBase.testLegacyConversionLogic(legacyPath, descriptorsMap);
+    
+    // Then: Legacy conversion should be skipped
+    assertEquals(false, legacyConversionAttempted, "Legacy conversion should 
be skipped when schema evolution is disabled");
+  }
+
+  @Test
+  public void testSchemaEvolutionEnabled_AllowsLegacyConversion() throws 
Exception {
+    // Given: Schema evolution is enabled
+    copyBase.setSchemaEvolutionEnabled(true);
+    
+    // Setup a column that would trigger legacy conversion
+    ColumnChunkMetaData legacyColumn = mock(ColumnChunkMetaData.class);
+    ColumnPath legacyPath = 
ColumnPath.fromDotString("testArray.bag.array_element");
+    when(legacyColumn.getPath()).thenReturn(legacyPath);
+    
+    List<ColumnChunkMetaData> columnsInOrder = Arrays.asList(legacyColumn);
+    when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+    
+    // Create descriptors map that doesn't contain the legacy path
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = new HashMap<>();
+    
+    copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+    
+    // When: Processing columns with legacy paths
+    boolean legacyConversionAttempted = 
copyBase.testLegacyConversionLogic(legacyPath, descriptorsMap);
+    
+    // Then: Legacy conversion should be attempted
+    assertEquals(true, legacyConversionAttempted, "Legacy conversion should be 
attempted when schema evolution is enabled");
+  }
+
+  /**
+   * Testable subclass that exposes internal methods and provides test setup.
+   */
+  private static class TestableHoodieParquetBinaryCopyBase extends 
HoodieParquetBinaryCopyBase {
+    private CompressionConverter.TransParquetFileReader testReader;
+    private BlockMetaData testBlock;
+    private List<ColumnChunkMetaData> testColumnsInOrder;
+
+    public TestableHoodieParquetBinaryCopyBase(Configuration conf) {
+      super(conf);
+    }
+
+    public void setupForTesting(CompressionConverter.TransParquetFileReader 
reader, 
+                               BlockMetaData block, List<ColumnChunkMetaData> 
columnsInOrder) {
+      this.testReader = reader;
+      this.testBlock = block;
+      this.testColumnsInOrder = columnsInOrder;
+    }
+
+    public void testProcessMissedColumns() throws Exception {
+      // Simulate the missed columns processing logic
+      ParquetMetadata meta = testReader.getFooter();
+      ColumnChunkMetaData columnChunkMetaData = testColumnsInOrder.get(0);
+      EncodingStats encodingStats = columnChunkMetaData.getEncodingStats();
+      
+      List<ColumnDescriptor> missedColumns = missedColumns(requiredSchema, 
meta.getFileMetaData().getSchema())
+          .stream()
+          .collect(Collectors.toList());
+      
+      // If schema evolution is disabled and there are missing columns, throw 
an exception
+      if (!schemaEvolutionEnabled && !missedColumns.isEmpty()) {
+        String missingColumnsStr = missedColumns.stream()
+            .map(c -> String.join(".", c.getPath()))
+            .collect(Collectors.joining(", "));
+        throw new HoodieException("Schema evolution is disabled but found 
missing columns in input file: " 
+            + missingColumnsStr + ". All input files must have the same schema 
when schema evolution is disabled.");
+      }
+      
+      for (ColumnDescriptor descriptor : missedColumns) {
+        addNullColumn(descriptor, 100L, encodingStats, null, requiredSchema, 
CompressionCodecName.SNAPPY);
+      }
+    }
+
+    public boolean testLegacyConversionLogic(ColumnPath columnPath, 
Map<ColumnPath, ColumnDescriptor> descriptorsMap) {
+      ColumnDescriptor descriptor = descriptorsMap.get(columnPath);
+      
+      // resolve the conflict schema between avro parquet write support and 
spark native parquet write support
+      // Only attempt legacy conversion if schema evolution is enabled
+      if (descriptor == null && schemaEvolutionEnabled) {
+        String[] path = columnPath.toArray();
+        path = Arrays.copyOf(path, path.length);
+        if (convertLegacy3LevelArray(path) || convertLegacyMap(path)) {
+          return true; // Legacy conversion was attempted
+        }
+      }
+      return false; // Legacy conversion was not attempted
+    }
+
+    private List<ColumnDescriptor> missedColumns(MessageType requiredSchema, 
MessageType fileSchema) {
+      return requiredSchema.getColumns().stream()
+          .filter(col -> !fileSchema.containsPath(col.getPath()))
+          .collect(Collectors.toList());
+    }
+
+    // Mock implementation to avoid complex ParquetFileWriter setup
+    protected void addNullColumn(ColumnDescriptor descriptor, long 
totalChunkValues, EncodingStats encodingStats,
+                                Object writer, MessageType schema, 
CompressionCodecName newCodecName) {
+      // Mock implementation - do nothing
+    }
+
+    @Override
+    protected Map<String, String> finalizeMetadata() {
+      return new HashMap<>();
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
index 20a62978a4f3..9d85f371f008 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
@@ -70,7 +70,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
@@ -125,7 +124,7 @@ public class TestHoodieParquetFileBinaryCopier {
         .map(StoragePath::new)
         .collect(Collectors.toList());
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema, true);
     writer.close();
     verify(schema, CompressionCodecName.GZIP);
   }
@@ -143,7 +142,7 @@ public class TestHoodieParquetFileBinaryCopier {
         .map(StoragePath::new)
         .collect(Collectors.toList());
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema, true);
     writer.close();
     verify(schema, CompressionCodecName.ZSTD);
   }
@@ -171,7 +170,7 @@ public class TestHoodieParquetFileBinaryCopier {
         .map(StoragePath::new)
         .collect(Collectors.toList());
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema1, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema1, true);
     writer.close();
     verify(schema1, CompressionCodecName.UNCOMPRESSED);
   }
@@ -201,7 +200,7 @@ public class TestHoodieParquetFileBinaryCopier {
 
     writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, true);
     writer.close();
 
     List<ColumnDescriptor> columns = inputSchema.getColumns();
@@ -271,7 +270,7 @@ public class TestHoodieParquetFileBinaryCopier {
 
     writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, true);
     writer.close();
 
     List<ColumnDescriptor> columns = inputSchema.getColumns();
@@ -313,7 +312,7 @@ public class TestHoodieParquetFileBinaryCopier {
 
     writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, true);
     writer.close();
 
     List<ColumnDescriptor> columns = inputSchema.getColumns();
@@ -388,7 +387,7 @@ public class TestHoodieParquetFileBinaryCopier {
 
     writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
requiredSchema, true);
     writer.close();
     List<ColumnDescriptor> columns = inputSchema.getColumns();
     Assertions.assertEquals(5, columns.size());
@@ -420,7 +419,7 @@ public class TestHoodieParquetFileBinaryCopier {
         .map(StoragePath::new)
         .collect(Collectors.toList());
     StoragePath outputPath = new StoragePath(outputFile);
-    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema, new Properties());
+    writer.binaryCopy(inputPaths, Collections.singletonList(outputPath), 
schema, true);
     writer.close();
     verify(schema, CompressionCodecName.GZIP);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
index 696e9bba6b6f..fec133b647f4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
@@ -47,7 +47,8 @@ class TestClusteringBinaryCopyStrategy extends 
HoodieSparkProcedureTestBase {
               "hoodie.sql.bulk.insert.enable" -> "true",
               "hoodie.sql.insert.mode" -> "non-strict",
               "hoodie.combine.before.insert" -> "false",
-              "hoodie.parquet.small.file.limit" -> "-1"
+              "hoodie.parquet.small.file.limit" -> "-1",
+              
"hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable" -> "true"
             )
           case "insert" =>
             Map(
@@ -55,7 +56,8 @@ class TestClusteringBinaryCopyStrategy extends 
HoodieSparkProcedureTestBase {
               "hoodie.sql.insert.mode" -> "non-strict",
               "hoodie.combine.before.insert" -> "false",
               "spark.hadoop.parquet.avro.write-old-list-structure" -> "false",
-              "hoodie.parquet.small.file.limit" -> "-1"
+              "hoodie.parquet.small.file.limit" -> "-1",
+              
"hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable" -> "true"
             )
         }
         withSQLConf(conf.toSeq: _*) {

Reply via email to