This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 034cf019346d [HUDI-9685] Merge row groups for file stitching with
Parquet API and group by schema (#13674)
034cf019346d is described below
commit 034cf019346d618a7f6ac8c31deff0a73c39c317
Author: Xinli Shang <[email protected]>
AuthorDate: Sun Aug 24 06:27:31 2025 -0700
[HUDI-9685] Merge row groups for file stitching with Parquet API and group
by schema (#13674)
* [HUDI-9685] Add schema evolution control feature for binary copy
operations for file merging
* Address comments: Change to default of disabling schema evolution
* Add a setting method to change the new flag
* Refactor the code
* Change the name of config
* Rename the config name to follow the convention
* retrigger CI
* Fix test error
* Change the config name
* Add sparkStreamCopyClustering strategy
* Remove unused class
---
.../apache/hudi/config/HoodieClusteringConfig.java | 19 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../org/apache/hudi/io/HoodieBinaryCopyHandle.java | 29 +-
.../config/TestHoodieWriteConfigFileStitching.java | 73 +++++
.../TestHoodieBinaryCopyHandleSchemaEvolution.java | 221 ++++++++++++++
.../TestPartitionAwareClusteringPlanStrategy.java | 6 +-
.../SparkStreamCopyClusteringPlanStrategy.java | 217 ++++++++++++++
...SparkStreamCopyClusteringExecutionStrategy.java | 110 +++++++
.../hudi/io/storage/HoodieFileBinaryCopier.java | 3 +-
.../org/apache/hudi/common/util/ParquetUtils.java | 15 +
.../parquet/io/HoodieParquetBinaryCopyBase.java | 68 ++++-
.../parquet/io/HoodieParquetFileBinaryCopier.java | 6 +-
.../apache/hudi/common/util/TestParquetUtils.java | 105 +++++++
...HoodieParquetBinaryCopyBaseSchemaEvolution.java | 324 +++++++++++++++++++++
.../io/TestHoodieParquetFileBinaryCopier.java | 17 +-
.../TestClusteringBinaryCopyStrategy.scala | 6 +-
16 files changed, 1193 insertions(+), 30 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index c1e0887e15cd..766b64f0eb1f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -57,8 +57,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
+ public static final String SPARK_STREAM_COPY_CLUSTERING_PLAN_STRATEGY =
+
"org.apache.hudi.client.clustering.plan.strategy.SparkStreamCopyClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
+ public static final String SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY =
+
"org.apache.hudi.client.clustering.run.strategy.SparkStreamCopyClusteringExecutionStrategy";
public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
public static final String
SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY =
@@ -343,6 +347,16 @@ public class HoodieClusteringConfig extends HoodieConfig {
+ "Please exercise caution while setting this config, especially
when clustering is done very frequently. This could lead to race condition in "
+ "rare scenarios, for example, when the clustering completes after
instants are fetched but before rollback completed.");
+ public static final ConfigProperty<Boolean>
FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE = ConfigProperty
+ .key(CLUSTERING_STRATEGY_PARAM_PREFIX +
"binary.copy.schema.evolution.enable")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Enable schema evolution support for binary file
stitching during clustering. "
+ + "When enabled, allows clustering of files with different but
compatible schemas (e.g., files with added columns). "
+ + "When disabled (default), only files with identical schemas will
be clustered together, providing better performance "
+ + "but requiring schema consistency across all files in a clustering
group.");
+
/**
* @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead
*/
@@ -611,6 +625,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
+ public Builder withFileStitchingBinaryCopySchemaEvolutionEnabled(Boolean
enabled) {
+
clusteringConfig.setValue(FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
String.valueOf(enabled));
+ return this;
+ }
+
public Builder withDataOptimizeStrategy(String strategy) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1db77f325fc8..dd244f4aefc7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1847,6 +1847,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return
getBoolean(HoodieClusteringConfig.ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT);
}
+ public boolean isBinaryCopySchemaEvolutionEnabled() {
+ return
getBooleanOrDefault(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE);
+ }
+
public int getInlineClusterMaxCommits() {
return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
index b8ad1ba797fd..47bd7b0aae42 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBinaryCopyHandle.java
@@ -30,6 +30,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.HoodieFileMetadataMerger;
import org.apache.hudi.io.storage.HoodieFileBinaryCopier;
import org.apache.hudi.parquet.io.HoodieParquetFileBinaryCopier;
+import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -63,6 +64,26 @@ public class HoodieBinaryCopyHandle<T, I, K, O> extends
HoodieWriteHandle<T, I,
protected long recordsWritten = 0;
protected long insertRecordsWritten = 0;
+ private MessageType getWriteSchema(HoodieWriteConfig config,
List<StoragePath> inputFiles, Configuration conf, HoodieTable<?, ?, ?, ?>
table) {
+ if (!config.isBinaryCopySchemaEvolutionEnabled() && !inputFiles.isEmpty())
{
+ // When schema evolution is disabled, use the schema from the first
input file
+ // All files should have the same schema in this case
+ try {
+ ParquetUtils parquetUtils = new ParquetUtils();
+ MessageType fileSchema = parquetUtils.readSchema(table.getStorage(),
inputFiles.get(0));
+ LOG.info("Binary copy schema evolution disabled. Using schema from
input file: " + inputFiles.get(0));
+ return fileSchema;
+ } catch (Exception e) {
+ LOG.error("Failed to read schema from input file", e);
+ throw new HoodieIOException("Failed to read schema from input file
when schema evolution is disabled: " + inputFiles.get(0),
+ e instanceof IOException ? (IOException) e : new IOException(e));
+ }
+ } else {
+ // Default behavior: use the table's write schema for evolution
+ return new AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
+ }
+ }
+
public HoodieBinaryCopyHandle(
HoodieWriteConfig config,
String instantTime,
@@ -74,7 +95,9 @@ public class HoodieBinaryCopyHandle<T, I, K, O> extends
HoodieWriteHandle<T, I,
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, true);
this.inputFiles = inputFilePaths;
this.conf = hoodieTable.getStorageConf().unwrapAs(Configuration.class);
- this.writeScheMessageType = new
AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
+ // When schema evolution is disabled, use the schema from one of the input
files
+ // Otherwise, use the table's write schema
+ this.writeScheMessageType = getWriteSchema(config, inputFilePaths, conf,
hoodieTable);
HoodieFileMetadataMerger fileMetadataMerger = new
HoodieFileMetadataMerger();
this.path = makeNewPath(partitionPath);
writeStatus.setFileId(fileId);
@@ -92,7 +115,9 @@ public class HoodieBinaryCopyHandle<T, I, K, O> extends
HoodieWriteHandle<T, I,
HoodieTimer timer = HoodieTimer.start();
long records = 0;
try {
- records = this.writer.binaryCopy(inputFiles,
Collections.singletonList(path), writeScheMessageType, config.getProps());
+ boolean schemaEvolutionEnabled =
config.isBinaryCopySchemaEvolutionEnabled();
+ LOG.info("Schema evolution enabled for binary copy: {}",
schemaEvolutionEnabled);
+ records = this.writer.binaryCopy(inputFiles,
Collections.singletonList(path), writeScheMessageType, schemaEvolutionEnabled);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
} finally {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfigFileStitching.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfigFileStitching.java
new file mode 100644
index 000000000000..09d2381f8fa4
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfigFileStitching.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Tests for file stitching binary copy schema evolution configuration.
+ */
+public class TestHoodieWriteConfigFileStitching {
+
+ @Test
+ public void testFileStitchingBinaryCopySchemaEvolutionConfig() {
+ // Test default value (should be false)
+ HoodieWriteConfig config1 = HoodieWriteConfig.newBuilder()
+ .withPath("/test/path")
+ .build();
+ assertFalse(config1.isBinaryCopySchemaEvolutionEnabled(),
+ "File stitching binary copy schema evolution should be disabled by
default");
+
+ // Test explicitly setting to false
+ Properties props = new Properties();
+
props.setProperty(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
"false");
+ HoodieWriteConfig config2 = HoodieWriteConfig.newBuilder()
+ .withPath("/test/path")
+ .withProps(props)
+ .build();
+ assertFalse(config2.isBinaryCopySchemaEvolutionEnabled(),
+ "File stitching binary copy schema evolution should be disabled when
explicitly set to false");
+
+ // Test explicitly setting to true
+
props.setProperty(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
"true");
+ HoodieWriteConfig config3 = HoodieWriteConfig.newBuilder()
+ .withPath("/test/path")
+ .withProps(props)
+ .build();
+ assertTrue(config3.isBinaryCopySchemaEvolutionEnabled(),
+ "File stitching binary copy schema evolution should be enabled when
explicitly set to true");
+ }
+
+ @Test
+ public void testClusteringConfigProperty() {
+ // Test that the clustering config property has the correct default value
+
assertEquals(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
+
HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.key(),
+ "Config key should match expected value");
+
+
assertFalse(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE.defaultValue(),
+ "Default value should be false to disable schema evolution");
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieBinaryCopyHandleSchemaEvolution.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieBinaryCopyHandleSchemaEvolution.java
new file mode 100644
index 000000000000..62e9269bc1ce
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieBinaryCopyHandleSchemaEvolution.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for HoodieBinaryCopyHandle schema evolution behavior.
+ */
+public class TestHoodieBinaryCopyHandleSchemaEvolution {
+
+ private HoodieWriteConfig config;
+ @Mock
+ private HoodieTable<?, ?, ?, ?> hoodieTable;
+ @Mock
+ private TaskContextSupplier taskContextSupplier;
+ @Mock
+ private HoodieStorage storage;
+ private Configuration hadoopConf;
+
+ private MessageType fileSchema;
+ private MessageType tableSchema;
+ private Schema avroSchema;
+ private List<StoragePath> inputFiles;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ // Initialize Hadoop Configuration
+ hadoopConf = new Configuration();
+
+ // Setup test schemas
+ String avroSchemaStr =
"{\"type\":\"record\",\"name\":\"TestRecord\",\"fields\":["
+ + "{\"name\":\"field1\",\"type\":\"string\"},"
+ + "{\"name\":\"field2\",\"type\":\"int\"}"
+ + "]}";
+ avroSchema = new Schema.Parser().parse(avroSchemaStr);
+
+ // Create different schemas for file vs table
+ fileSchema = new AvroSchemaConverter().convert(avroSchema);
+
+ String tableAvroSchemaStr =
"{\"type\":\"record\",\"name\":\"TestRecord\",\"fields\":["
+ + "{\"name\":\"field1\",\"type\":\"string\"},"
+ + "{\"name\":\"field2\",\"type\":\"int\"},"
+ +
"{\"name\":\"field3\",\"type\":[\"null\",\"string\"],\"default\":null}"
+ + "]}";
+ Schema tableAvroSchema = new Schema.Parser().parse(tableAvroSchemaStr);
+ tableSchema = new AvroSchemaConverter().convert(tableAvroSchema);
+
+ inputFiles = Arrays.asList(new StoragePath("/test/file1.parquet"), new
StoragePath("/test/file2.parquet"));
+
+ // Setup mocks
+ when(hoodieTable.getStorage()).thenReturn(storage);
+
when(hoodieTable.getStorageConf()).thenReturn(mock(org.apache.hudi.storage.StorageConfiguration.class));
+
when(hoodieTable.getStorageConf().unwrapAs(Configuration.class)).thenReturn(hadoopConf);
+ }
+
+ @Test
+ public void testSchemaEvolutionDisabled_UsesFileSchema() throws Exception {
+ // Given: Schema evolution is disabled
+ config = HoodieWriteConfig.newBuilder()
+ .withPath("/dummy/path")
+ .build();
+
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
"false");
+
+ // Mock ParquetUtils to return file schema
+ try (MockedConstruction<ParquetUtils> parquetUtilsConstruction =
mockConstruction(ParquetUtils.class,
+ (mock, context) -> {
+ when(mock.readSchema(eq(storage),
eq(inputFiles.get(0)))).thenReturn(fileSchema);
+ })) {
+
+ // When: Creating HoodieBinaryCopyHandle (we can't instantiate directly
due to complex dependencies,
+ // so we test the getWriteSchema method logic indirectly)
+ TestableHoodieBinaryCopyHandle handle = new
TestableHoodieBinaryCopyHandle();
+ MessageType result = handle.testGetWriteSchema(config, inputFiles,
hadoopConf, hoodieTable);
+
+ // Then: Should use file schema, not table schema
+ assertEquals(fileSchema, result);
+ // Verify that ParquetUtils was constructed and readSchema was called
+ assertEquals(1, parquetUtilsConstruction.constructed().size());
+ }
+ }
+
+ @Test
+ public void testSchemaEvolutionEnabled_UsesTableSchema() throws Exception {
+ // Given: Schema evolution is enabled (default)
+ config = HoodieWriteConfig.newBuilder()
+ .withPath("/dummy/path")
+ .build();
+
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
"true");
+
+ // When: Creating HoodieBinaryCopyHandle
+ TestableHoodieBinaryCopyHandle handle = new
TestableHoodieBinaryCopyHandle();
+ handle.setWriteSchemaWithMetaFields(avroSchema); // Set the expected table
schema
+ MessageType result = handle.testGetWriteSchema(config, inputFiles,
hadoopConf, hoodieTable);
+
+ // Then: Should use table schema (converted from Avro), not file schema
+ assertNotNull(result);
+ // When evolution is enabled, it should use the table schema directly
+ }
+
+ @Test
+ public void testSchemaEvolutionDisabled_FileReadError_ThrowsException()
throws Exception {
+ // Given: Schema evolution is disabled but file read fails
+ config = HoodieWriteConfig.newBuilder()
+ .withPath("/dummy/path")
+ .build();
+
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
"false");
+
+ // When: Creating HoodieBinaryCopyHandle with a file that causes read error
+ TestableHoodieBinaryCopyHandle handle = new
TestableHoodieBinaryCopyHandle();
+ handle.setWriteSchemaWithMetaFields(avroSchema);
+ handle.setSimulateFileReadError(true);
+
+ // Then: Should throw HoodieIOException
+ assertThrows(HoodieIOException.class, () ->
+ handle.testGetWriteSchema(config, inputFiles, hadoopConf, hoodieTable)
+ );
+ }
+
+ @Test
+ public void testSchemaEvolutionDisabled_EmptyInputFiles_UsesTableSchema()
throws Exception {
+ // Given: Schema evolution is disabled but no input files
+ config = HoodieWriteConfig.newBuilder()
+ .withPath("/dummy/path")
+ .build();
+
config.setValue(HoodieClusteringConfig.FILE_STITCHING_BINARY_COPY_SCHEMA_EVOLUTION_ENABLE,
"false");
+
+ // When: Creating HoodieBinaryCopyHandle with empty input files
+ TestableHoodieBinaryCopyHandle handle = new
TestableHoodieBinaryCopyHandle();
+ handle.setWriteSchemaWithMetaFields(avroSchema);
+ MessageType result = handle.testGetWriteSchema(config, Arrays.asList(),
hadoopConf, hoodieTable);
+
+ // Then: Should use table schema, not attempt to read from files
+ assertNotNull(result);
+ }
+
+ /**
+ * Testable subclass that exposes the getWriteSchema method for testing.
+ */
+ private static class TestableHoodieBinaryCopyHandle {
+ private Schema writeSchemaWithMetaFields;
+ private boolean simulateFileReadError = false;
+
+ public void setWriteSchemaWithMetaFields(Schema schema) {
+ this.writeSchemaWithMetaFields = schema;
+ }
+
+ public void setSimulateFileReadError(boolean simulateError) {
+ this.simulateFileReadError = simulateError;
+ }
+
+ public MessageType testGetWriteSchema(HoodieWriteConfig config,
List<StoragePath> inputFiles,
+ Configuration conf, HoodieTable<?, ?,
?, ?> table) {
+ if (!config.isBinaryCopySchemaEvolutionEnabled() &&
!inputFiles.isEmpty()) {
+ // When schema evolution is disabled, use the schema from the first
input file
+ // All files should have the same schema in this case
+ try {
+ if (simulateFileReadError) {
+ throw new IOException("Simulated file read error");
+ }
+ ParquetUtils parquetUtils = new ParquetUtils();
+ MessageType fileSchema = parquetUtils.readSchema(table.getStorage(),
inputFiles.get(0));
+ return fileSchema;
+ } catch (Exception e) {
+ throw new HoodieIOException("Failed to read schema from input file
when schema evolution is disabled: " + inputFiles.get(0),
+ e instanceof IOException ? (IOException) e : new IOException(e));
+ }
+ } else {
+ // Default behavior: use the table's write schema for evolution
+ return new
AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index e5ffda9fc53a..f5b8815658a7 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -26,6 +26,7 @@ import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import java.util.ArrayList;
import java.util.List;
@@ -41,10 +42,12 @@ public class TestPartitionAwareClusteringPlanStrategy {
HoodieTable table;
@Mock
HoodieEngineContext context;
+
HoodieWriteConfig hoodieWriteConfig;
@BeforeEach
public void setUp() {
+ MockitoAnnotations.openMocks(this);
Properties props = new Properties();
props.setProperty("hoodie.clustering.plan.strategy.partition.regex.pattern",
"2021072.*");
this.hoodieWriteConfig = HoodieWriteConfig
@@ -86,4 +89,5 @@ public class TestPartitionAwareClusteringPlanStrategy {
return null;
}
}
-}
+
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkStreamCopyClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkStreamCopyClusteringPlanStrategy.java
new file mode 100644
index 000000000000..2350f4bdc408
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkStreamCopyClusteringPlanStrategy.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
+import org.apache.hudi.util.Lazy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy that uses binary stream copy for execution.
+ * This strategy automatically sets the
SparkStreamCopyClusteringExecutionStrategy
+ * as the execution strategy when building the clustering plan.
+ */
+public class SparkStreamCopyClusteringPlanStrategy<T>
+ extends SparkSizeBasedClusteringPlanStrategy<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkStreamCopyClusteringPlanStrategy.class);
+
+ private static final String SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY =
+
"org.apache.hudi.client.clustering.run.strategy.SparkStreamCopyClusteringExecutionStrategy";
+
+ public SparkStreamCopyClusteringPlanStrategy(HoodieTable table,
+ HoodieEngineContext
engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ protected Map<String, String> getStrategyParams() {
+ Map<String, String> params = new HashMap<>();
+ if
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+ params.put(PLAN_STRATEGY_SORT_COLUMNS.key(),
getWriteConfig().getClusteringSortColumns());
+ }
+ // Add any additional parameters specific to stream copy strategy
+ return params;
+ }
+
+ @Override
+ protected Pair<Stream<HoodieClusteringGroup>, Boolean>
buildClusteringGroupsForPartition(
+ String partitionPath, List<FileSlice> fileSlices) {
+
+ // When schema evolution is disabled, use schema-aware grouping
+ // When schema evolution is enabled, use the default size-only grouping
+ if (!getWriteConfig().isBinaryCopySchemaEvolutionEnabled()) {
+ LOG.info("Schema evolution disabled, using schema-aware grouping for
partition: {}", partitionPath);
+ return
buildClusteringGroupsForPartitionWithSchemaGrouping(partitionPath, fileSlices);
+ } else {
+ LOG.info("Schema evolution enabled, using size-only grouping for
partition: {}", partitionPath);
+ return super.buildClusteringGroupsForPartition(partitionPath,
fileSlices);
+ }
+ }
+
+ /**
+ * Create Clustering group based on files eligible for clustering with
schema-aware grouping.
+ * When schema evolution is disabled, files are grouped by schema hash
first, then by size.
+ * This ensures only files with identical schemas are clustered together.
+ */
+ private Pair<Stream<HoodieClusteringGroup>, Boolean>
buildClusteringGroupsForPartitionWithSchemaGrouping(
+ String partitionPath, List<FileSlice> fileSlices) {
+ HoodieWriteConfig writeConfig = getWriteConfig();
+
+ List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+ List<FileSlice> currentGroup = new ArrayList<>();
+
+ // Sort fileSlices before dividing
+ List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+ sortedFileSlices.sort((o1, o2) -> (int)
+ ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
+ - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
+ long totalSizeSoFar = 0;
+ boolean partialScheduled = false;
+ Integer currentGroupSchemaHash = null; // Track schema hash for current
group
+
+ for (FileSlice currentSlice : sortedFileSlices) {
+ long currentSize = currentSlice.getBaseFile().isPresent()
+ ? currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
+
+ // Get schema hash from the file for schema-based grouping
+ Integer currentFileSchemaHash = getFileSchemaHash(currentSlice);
+
+ // Check if we need to create a new group due to schema mismatch
+ boolean schemaMismatch = currentGroupSchemaHash != null &&
!currentGroupSchemaHash.equals(currentFileSchemaHash);
+
+ // Check if max size is reached OR schema is different, and create new
group if needed
+ if ((totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() || schemaMismatch)
+ && !currentGroup.isEmpty()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding clustering group - size: {} max bytes: {} num input
slices: {} output groups: {}{}",
+ totalSizeSoFar, writeConfig.getClusteringMaxBytesInGroup(),
currentGroup.size(), numOutputGroups,
+ schemaMismatch ? " (schema change detected)" : "");
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ currentGroup = new ArrayList<>();
+ totalSizeSoFar = 0;
+ currentGroupSchemaHash = null;
+
+ // If fileSliceGroups size reaches the max group, stop loop
+ if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups())
{
+ LOG.info("Having generated the maximum number of groups: {}",
writeConfig.getClusteringMaxNumGroups());
+ partialScheduled = true;
+ break;
+ }
+ }
+
+ // Add to the current file-group
+ currentGroup.add(currentSlice);
+ // Set schema hash for the group if not set
+ if (currentGroupSchemaHash == null) {
+ currentGroupSchemaHash = currentFileSchemaHash;
+ }
+ totalSizeSoFar += currentSize;
+ }
+
+ if (!currentGroup.isEmpty()) {
+ if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding final clustering group - size: {} max bytes: {} num
input slices: {} output groups: {}",
+ totalSizeSoFar, writeConfig.getClusteringMaxBytesInGroup(),
currentGroup.size(), numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ }
+ }
+
+ return Pair.of(fileSliceGroups.stream().map(fileSliceGroup ->
+ HoodieClusteringGroup.newBuilder()
+ .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+ .setNumOutputFileGroups(fileSliceGroup.getRight())
+ .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+ .build()), partialScheduled);
+ }
+
+ /**
+ * Get the schema hash for a file slice to enable schema-based grouping.
+ */
+ private Integer getFileSchemaHash(FileSlice fileSlice) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ String filePath = fileSlice.getBaseFile().get().getPath();
+ try {
+ // Use centralized ParquetUtils method to read schema hash
+ return ParquetUtils.readSchemaHash(
+ getHoodieTable().getStorage(),
+ new StoragePath(filePath));
+ } catch (Exception e) {
+ LOG.warn("Failed to read schema hash from file: {}", filePath, e);
+ // Return a default hash if we can't read the schema
+ return 0;
+ }
+ }
+ // Return default hash for files without base file
+ return 0;
+ }
+
+ @Override
+ public Option<HoodieClusteringPlan>
generateClusteringPlan(ClusteringPlanActionExecutor executor,
Lazy<List<String>> partitions) {
+ Option<HoodieClusteringPlan> planOption =
super.generateClusteringPlan(executor, partitions);
+
+ if (planOption.isPresent()) {
+ HoodieClusteringPlan plan = planOption.get();
+
+ // Override the execution strategy to use
SparkStreamCopyClusteringExecutionStrategy
+ HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
+
.setStrategyClassName(SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY)
+ .setStrategyParams(getStrategyParams())
+ .build();
+
+ LOG.info("Setting execution strategy to
SparkStreamCopyClusteringExecutionStrategy for stream copy clustering");
+
+ return Option.of(HoodieClusteringPlan.newBuilder()
+ .setStrategy(strategy)
+ .setInputGroups(plan.getInputGroups())
+ .setExtraMetadata(getExtraMetadata())
+ .setVersion(getPlanVersion())
+ .setPreserveHoodieMetadata(true)
+ .setMissingSchedulePartitions(plan.getMissingSchedulePartitions())
+ .build());
+ }
+
+ return planOption;
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkStreamCopyClusteringExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkStreamCopyClusteringExecutionStrategy.java
new file mode 100644
index 000000000000..3e7d1d2c1567
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkStreamCopyClusteringExecutionStrategy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.parquet.io.ParquetBinaryCopyChecker;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering execution strategy that uses binary stream copy.
+ * This strategy extends SparkBinaryCopyClusteringExecutionStrategy and
modifies
+ * the supportBinaryStreamCopy method to skip schema checks when schema
evolution is disabled.
+ */
+public class SparkStreamCopyClusteringExecutionStrategy<T>
+ extends SparkBinaryCopyClusteringExecutionStrategy<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkStreamCopyClusteringExecutionStrategy.class);
+
+ public SparkStreamCopyClusteringExecutionStrategy(HoodieTable table,
+ HoodieEngineContext
engineContext,
+ HoodieWriteConfig
writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ public boolean supportBinaryStreamCopy(List<ClusteringGroupInfo>
inputGroups,
+ Map<String, String> strategyParams) {
+ // Check if table type is Copy-on-Write
+ if (getHoodieTable().getMetaClient().getTableType() != COPY_ON_WRITE) {
+ LOG.warn("Only support CoW table. Will fall back to common clustering
execution strategy.");
+ return false;
+ }
+
+ // Check if sorting is requested (not supported in binary copy)
+ Option<String[]> orderByColumnsOpt =
+ Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
+ .map(listStr -> listStr.split(","));
+
+ if (orderByColumnsOpt.isPresent()) {
+ LOG.warn("Not support to sort input records. Will fall back to common
clustering execution strategy.");
+ return false;
+ }
+
+ // Check if base file format is Parquet
+ if
(!getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat().equals(PARQUET))
{
+ LOG.warn("Binary Copy only support parquet as base file format for
now.");
+ return false;
+ }
+
+ // Check if schema evolution is enabled
+ if (!writeConfig.isBinaryCopySchemaEvolutionEnabled()) {
+ // Skip schema checking when schema evolution is disabled
+ LOG.info("Schema evolution disabled, skipping schema compatibility
checks for binary stream copy");
+ return true;
+ }
+
+ // Perform schema compatibility checks when schema evolution is enabled
+ LOG.info("Schema evolution enabled, performing schema compatibility checks
for binary stream copy");
+ JavaSparkContext engineContext =
HoodieSparkEngineContext.getSparkContext(getEngineContext());
+
+ List<ParquetBinaryCopyChecker.ParquetFileInfo> fileStatus =
engineContext.parallelize(inputGroups, inputGroups.size())
+ .flatMap(group -> group.getOperations().iterator())
+ .map(op -> {
+ String filePath = op.getDataFilePath();
+ return ParquetBinaryCopyChecker.collectFileInfo(
+ getHoodieTable().getStorageConf().unwrapAs(Configuration.class),
+ filePath);
+ })
+ .collect();
+
+ boolean compatible = ParquetBinaryCopyChecker.verifyFiles(fileStatus);
+ if (!compatible) {
+ LOG.warn("Schema compatibility check failed. Will fall back to common
clustering execution strategy.");
+ }
+
+ return compatible;
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
index b414ad534c5f..690f12ebdd1c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileBinaryCopier.java
@@ -24,7 +24,6 @@ import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.List;
-import java.util.Properties;
/**
* HoodieFileBinaryCopier is a high-performance utility designed for efficient
merging of data files at the binary level.
@@ -34,7 +33,7 @@ import java.util.Properties;
*/
public interface HoodieFileBinaryCopier {
- long binaryCopy(List<StoragePath> inputFilePaths, List<StoragePath>
outputFilePath, MessageType writeSchema, Properties props) throws IOException;
+ long binaryCopy(List<StoragePath> inputFilePaths, List<StoragePath>
outputFilePath, MessageType writeSchema, boolean schemaEvolutionEnabled) throws
IOException;
void close() throws IOException;
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e47468b54c33..c247945d9e46 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -232,6 +232,21 @@ public class ParquetUtils extends FileFormatUtils {
public MessageType readSchema(HoodieStorage storage, StoragePath
parquetFilePath) {
return readMetadata(storage,
parquetFilePath).getFileMetaData().getSchema();
}
+
+ /**
+ * Get the hash code of the schema from a parquet file.
+ * This is useful for quickly comparing schemas without full comparison.
+ */
+ public static Integer readSchemaHash(HoodieStorage storage, StoragePath
parquetFilePath) {
+ try {
+ ParquetUtils parquetUtils = new ParquetUtils();
+ MessageType schema = parquetUtils.readSchema(storage, parquetFilePath);
+ return schema.hashCode();
+ } catch (Exception e) {
+ LOG.warn("Failed to read schema hash from file: " + parquetFilePath, e);
+ return 0;
+ }
+ }
@Override
public Map<String, String> readFooter(HoodieStorage storage, boolean
required,
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
index 3bbd46a013e3..46640b619e02 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetBinaryCopyBase.java
@@ -120,10 +120,17 @@ public abstract class HoodieParquetBinaryCopyBase
implements Closeable {
protected MessageType requiredSchema = null;
protected Configuration conf;
+
+ // Flag to control schema evolution behavior
+ protected Boolean schemaEvolutionEnabled = null;
public HoodieParquetBinaryCopyBase(Configuration conf) {
this.conf = conf;
}
+
+ public void setSchemaEvolutionEnabled(boolean enabled) {
+ this.schemaEvolutionEnabled = enabled;
+ }
protected void initFileWriter(Path outPutFile, CompressionCodecName
newCodecName, MessageType schema) {
try {
@@ -188,9 +195,14 @@ public abstract class HoodieParquetBinaryCopyBase
implements Closeable {
for (int i = 0; i < columnsInOrder.size(); i++) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ if (schemaEvolutionEnabled == null) {
+ throw new HoodieException("The variable 'schemaEvolutionEnabled' is
supposed to be set in "
+ + "binaryCopy() before calling this method
processBlocksFromReader");
+ }
// resolve the conflict schema between avro parquet write support and
spark native parquet write support
- if (descriptor == null) {
+ // Only attempt legacy conversion if schema evolution is enabled
+ if (descriptor == null && schemaEvolutionEnabled) {
String[] path = chunk.getPath().toArray();
path = Arrays.copyOf(path, path.length);
if (convertLegacy3LevelArray(path) || convertLegacyMap(path)) {
@@ -226,6 +238,14 @@ public abstract class HoodieParquetBinaryCopyBase
implements Closeable {
CompressionCodecName newCodecName = this.newCodecName == null ?
chunk.getCodec() : this.newCodecName;
if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+ // Check if this is NOT the FILENAME_METADATA_FIELD and schema
evolution is disabled
+ if
(!chunk.getPath().toDotString().equals(HoodieRecord.FILENAME_METADATA_FIELD)) {
+ if (!schemaEvolutionEnabled) {
+ throw new HoodieException("Column masking for '" +
chunk.getPath().toDotString()
+ + "' requires schema evolution to be enabled. "
+ + "Set
'hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable' to
true.");
+ }
+ }
// Mask column and compress it again.
Binary maskValue = maskColumns.get(chunk.getPath());
if (maskValue != null) {
@@ -261,6 +281,16 @@ public abstract class HoodieParquetBinaryCopyBase
implements Closeable {
.stream()
.filter(c -> !converted.contains(c))
.collect(Collectors.toList());
+
+ // If schema evolution is disabled and there are missing columns, throw an
exception
+ if (!schemaEvolutionEnabled && !missedColumns.isEmpty()) {
+ String missingColumnsStr = missedColumns.stream()
+ .map(c -> String.join(".", c.getPath()))
+ .collect(Collectors.joining(", "));
+ throw new HoodieException("Schema evolution is disabled but found
missing columns in input file: "
+ + missingColumnsStr + ". All input files must have the same schema
when schema evolution is disabled.");
+ }
+
for (ColumnDescriptor descriptor : missedColumns) {
addNullColumn(
descriptor,
@@ -644,18 +674,32 @@ public abstract class HoodieParquetBinaryCopyBase
implements Closeable {
@VisibleForTesting
public boolean convertLegacy3LevelArray(String[] path) {
boolean changed = false;
- for (int i = 0; i < path.length; i++) {
- try {
- String[] subPath = Arrays.copyOf(path, i + 1);
- Type type = this.requiredSchema.getType(subPath);
- if (type.getOriginalType() == LIST && "bag".equals(path[i + 1])) {
- // Convert from xxx.bag.array to xxx.list.element
- path[i + 1] = "list";
- path[i + 2] = "element";
- changed = true;
+
+ // For schema evolution, also check for legacy patterns even if the field
doesn't exist in the schema
+ for (int i = 0; i < path.length - 2; i++) {
+ if ("bag".equals(path[i + 1]) && "array_element".equals(path[i + 2])) {
+ // Convert from xxx.bag.array_element to xxx.list.element
+ path[i + 1] = "list";
+ path[i + 2] = "element";
+ changed = true;
+ break;
+ }
+ }
+
+ if (!changed) {
+ for (int i = 0; i < path.length; i++) {
+ try {
+ String[] subPath = Arrays.copyOf(path, i + 1);
+ Type type = this.requiredSchema.getType(subPath);
+ if (type.getOriginalType() == LIST && i + 1 < path.length &&
"bag".equals(path[i + 1])) {
+ // Convert from xxx.bag.array to xxx.list.element
+ path[i + 1] = "list";
+ path[i + 2] = "element";
+ changed = true;
+ }
+ } catch (InvalidRecordException e) {
+ LOG.debug("field not found due to schema evolution, nothing need to
do");
}
- } catch (InvalidRecordException e) {
- LOG.debug("field not found due to schema evolution, nothing need to
do");
}
}
return changed;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
index 140cfd437d73..7ca88182d83d 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/HoodieParquetFileBinaryCopier.java
@@ -41,7 +41,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Queue;
import java.util.Set;
@@ -99,7 +98,10 @@ public class HoodieParquetFileBinaryCopier extends
HoodieParquetBinaryCopyBase i
public long binaryCopy(List<StoragePath> inputFilePaths,
List<StoragePath> outputFilePath,
MessageType writeSchema,
- Properties props) throws IOException {
+ boolean schemaEvolutionEnabled) throws IOException {
+ // Set schema evolution enabled flag
+ setSchemaEvolutionEnabled(schemaEvolutionEnabled);
+
openInputFiles(inputFilePaths, conf);
initFileWriter(new Path(outputFilePath.get(0).toUri()), codecName,
writeSchema);
initNextReader();
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index f86ae394b10e..17e80ab63f95 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -397,4 +398,108 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
return Arrays.asList(new String[]{partitionField});
}
}
+
+ @Test
+ public void testReadSchemaHash() throws Exception {
+ // Given: Create a parquet file with a specific schema
+ List<String> rowKeys = Arrays.asList("row1", "row2", "row3");
+ String filePath = Paths.get(basePath,
"test_schema_hash.parquet").toUri().toString();
+ writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
+
+ StoragePath storagePath = new StoragePath(filePath);
+
+ // When: Reading schema hash
+ Integer schemaHash =
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath), storagePath);
+
+ // Then: Should return a valid hash
+ assertTrue(schemaHash != null, "Schema hash should not be null");
+ assertTrue(schemaHash != 0, "Schema hash should not be zero (default error
value)");
+
+ // Verify consistency - reading same file should return same hash
+ Integer secondRead =
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath), storagePath);
+ assertEquals(schemaHash, secondRead, "Schema hash should be consistent
across reads");
+ }
+
+ @Test
+ public void testReadSchemaHash_DifferentSchemas() throws Exception {
+ // Given: Create two parquet files with different schemas
+ List<String> rowKeys = Arrays.asList("row1", "row2");
+
+ // File 1 with original schema
+ String filePath1 = Paths.get(basePath,
"test_schema1.parquet").toUri().toString();
+ writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath1, rowKeys);
+
+ // File 2 with extended schema (add a field)
+ String filePath2 = Paths.get(basePath,
"test_schema2.parquet").toUri().toString();
+ writeParquetFileWithExtendedSchema(filePath2, rowKeys);
+
+ // When: Reading schema hashes from both files
+ Integer hash1 =
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath1), new
StoragePath(filePath1));
+ Integer hash2 =
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath2), new
StoragePath(filePath2));
+
+ // Then: Should have different hashes for different schemas
+ assertTrue(hash1 != null && hash2 != null, "Both schema hashes should be
valid");
+ assertTrue(!hash1.equals(hash2), "Different schemas should have different
hash codes");
+ }
+
+ @Test
+ public void testReadSchemaHash_NonExistentFile() throws Exception {
+ // Given: Non-existent file path
+ StoragePath nonExistentPath = new
StoragePath("/non/existent/file.parquet");
+
+ // When: Reading schema hash from non-existent file
+ Integer schemaHash =
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(basePath),
nonExistentPath);
+
+ // Then: Should return 0 (error default value)
+ assertEquals(Integer.valueOf(0), schemaHash, "Non-existent file should
return default error value 0");
+ }
+
+ @Test
+ public void testReadSchemaHash_MatchesDirectSchemaRead() throws Exception {
+ // Given: Create a parquet file
+ List<String> rowKeys = Arrays.asList("row1", "row2", "row3");
+ String filePath = Paths.get(basePath,
"test_direct_schema.parquet").toUri().toString();
+ writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
+
+ StoragePath storagePath = new StoragePath(filePath);
+
+ // When: Reading schema hash vs direct schema read
+ Integer schemaHashFromUtils =
ParquetUtils.readSchemaHash(HoodieTestUtils.getStorage(filePath), storagePath);
+ MessageType directSchema =
parquetUtils.readSchema(HoodieTestUtils.getStorage(filePath), storagePath);
+ Integer directSchemaHash = directSchema.hashCode();
+
+ // Then: Hash from utility method should match direct schema hash
+ assertEquals(directSchemaHash, schemaHashFromUtils,
+ "Schema hash from utility should match direct schema.hashCode()");
+ }
+
+ private void writeParquetFileWithExtendedSchema(String filePath,
List<String> rowKeys) throws Exception {
+ // Create an extended schema with an additional field
+ Schema extendedSchema = Schema.createRecord("record", "", "", false);
+ List<Schema.Field> fields = new ArrayList<>();
+ fields.add(new Schema.Field("_row_key", Schema.create(Schema.Type.STRING),
"", (Object) null));
+ fields.add(new Schema.Field("time", Schema.create(Schema.Type.LONG), "",
(Object) null));
+ fields.add(new Schema.Field("number", Schema.create(Schema.Type.LONG), "",
(Object) null));
+ fields.add(new Schema.Field("extra_field",
createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); //
Additional field
+ extendedSchema.setFields(fields);
+
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001,
-1, BloomFilterTypeCode.SIMPLE.name());
+
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(extendedSchema), extendedSchema,
Option.of(filter), new Properties());
+
+ ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport,
CompressionCodecName.GZIP,
+ 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
+
+ for (String rowKey : rowKeys) {
+ GenericRecord record = new GenericData.Record(extendedSchema);
+ record.put("_row_key", rowKey);
+ record.put("time", 1234567L);
+ record.put("number", 12345L);
+ record.put("extra_field", "extra_value"); // Set the extra field
+ writer.write(record);
+ writeSupport.add(rowKey);
+ }
+ writer.close();
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetBinaryCopyBaseSchemaEvolution.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetBinaryCopyBaseSchemaEvolution.java
new file mode 100644
index 000000000000..e7e1ca7d442d
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetBinaryCopyBaseSchemaEvolution.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for HoodieParquetBinaryCopyBase schema evolution behavior.
+ */
+public class TestHoodieParquetBinaryCopyBaseSchemaEvolution {
+
+ @Mock
+ private CompressionConverter.TransParquetFileReader reader;
+ @Mock
+ private ParquetMetadata parquetMetadata;
+ @Mock
+ private FileMetaData fileMetaData;
+ @Mock
+ private BlockMetaData blockMetaData;
+ @Mock
+ private ColumnChunkMetaData columnChunkMetaData;
+ @Mock
+ private EncodingStats encodingStats;
+
+ private TestableHoodieParquetBinaryCopyBase copyBase;
+ private MessageType requiredSchema;
+ private MessageType fileSchema;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ // Setup test schemas
+ requiredSchema = Types.buildMessage()
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
PrimitiveType.Repetition.REQUIRED)
+ .named("field1"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
PrimitiveType.Repetition.OPTIONAL)
+ .named("field2"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
PrimitiveType.Repetition.OPTIONAL)
+ .named("field3"))
+ .named("TestRecord");
+
+ // File schema missing field3
+ fileSchema = Types.buildMessage()
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
PrimitiveType.Repetition.REQUIRED)
+ .named("field1"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
PrimitiveType.Repetition.OPTIONAL)
+ .named("field2"))
+ .named("TestRecord");
+
+ copyBase = spy(new TestableHoodieParquetBinaryCopyBase(new
Configuration()));
+ copyBase.requiredSchema = requiredSchema;
+
+ // Setup mocks
+ when(reader.getFooter()).thenReturn(parquetMetadata);
+ when(parquetMetadata.getFileMetaData()).thenReturn(fileMetaData);
+ when(fileMetaData.getSchema()).thenReturn(fileSchema);
+ when(columnChunkMetaData.getEncodingStats()).thenReturn(encodingStats);
+
+ // Mock the addNullColumn method to avoid complex setup
+ doNothing().when(copyBase).addNullColumn(any(ColumnDescriptor.class),
anyLong(), any(EncodingStats.class),
+ any(), any(MessageType.class), any(CompressionCodecName.class));
+ }
+
+ @Test
+ public void testSchemaEvolutionEnabled_AllowsMissingColumns() throws
Exception {
+ // Given: Schema evolution is enabled (default)
+ copyBase.setSchemaEvolutionEnabled(true);
+
+ List<ColumnChunkMetaData> columnsInOrder =
Arrays.asList(columnChunkMetaData);
+ when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+
+ // Setup mock to simulate processing logic without the complex
ParquetFileWriter setup
+ copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+
+ // When: Processing blocks with missing columns
+ // Then: Should not throw exception
+ assertDoesNotThrow(() -> {
+ copyBase.testProcessMissedColumns();
+ });
+
+ // Should call addNullColumn for missing field3
+ verify(copyBase, times(1)).addNullColumn(any(ColumnDescriptor.class),
anyLong(),
+ any(EncodingStats.class), any(), eq(requiredSchema),
any(CompressionCodecName.class));
+ }
+
+ @Test
+ public void testSchemaEvolutionDisabled_ThrowsExceptionOnMissingColumns()
throws Exception {
+ // Given: Schema evolution is disabled
+ copyBase.setSchemaEvolutionEnabled(false);
+
+ List<ColumnChunkMetaData> columnsInOrder =
Arrays.asList(columnChunkMetaData);
+ when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+
+ copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+
+ // When: Processing blocks with missing columns
+ // Then: Should throw HoodieException
+ HoodieException exception = assertThrows(HoodieException.class, () -> {
+ copyBase.testProcessMissedColumns();
+ });
+
+ // Verify exception message contains information about missing columns
+ assertEquals("Schema evolution is disabled but found missing columns in
input file: field3. "
+ + "All input files must have the same schema when schema evolution is
disabled.", exception.getMessage());
+
+ // Should not call addNullColumn when evolution is disabled
+ verify(copyBase, never()).addNullColumn(any(ColumnDescriptor.class),
anyLong(),
+ any(EncodingStats.class), any(), any(MessageType.class),
any(CompressionCodecName.class));
+ }
+
+ @Test
+ public void testSchemaEvolutionDisabled_NoMissingColumns_DoesNotThrow()
throws Exception {
+ // Given: Schema evolution is disabled, but file schema matches required
schema
+ copyBase.setSchemaEvolutionEnabled(false);
+
+ // File schema with all required fields
+ MessageType completeFileSchema = Types.buildMessage()
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
PrimitiveType.Repetition.REQUIRED)
+ .named("field1"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
PrimitiveType.Repetition.OPTIONAL)
+ .named("field2"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
PrimitiveType.Repetition.OPTIONAL)
+ .named("field3"))
+ .named("TestRecord");
+
+ when(fileMetaData.getSchema()).thenReturn(completeFileSchema);
+
+ List<ColumnChunkMetaData> columnsInOrder =
Arrays.asList(columnChunkMetaData);
+ when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+
+ copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+
+ // When: Processing blocks with no missing columns
+ // Then: Should not throw exception
+ assertDoesNotThrow(() -> {
+ copyBase.testProcessMissedColumns();
+ });
+
+ // Should not call addNullColumn when no columns are missing
+ verify(copyBase, never()).addNullColumn(any(ColumnDescriptor.class),
anyLong(),
+ any(EncodingStats.class), any(), any(MessageType.class),
any(CompressionCodecName.class));
+ }
+
+ @Test
+ public void testSchemaEvolutionDisabled_SkipsLegacyConversion() throws
Exception {
+ // Given: Schema evolution is disabled
+ copyBase.setSchemaEvolutionEnabled(false);
+
+ // Setup a column that would normally trigger legacy conversion
+ ColumnChunkMetaData legacyColumn = mock(ColumnChunkMetaData.class);
+ ColumnPath legacyPath =
ColumnPath.fromDotString("testArray.bag.array_element");
+ when(legacyColumn.getPath()).thenReturn(legacyPath);
+
+ List<ColumnChunkMetaData> columnsInOrder = Arrays.asList(legacyColumn);
+ when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+
+ // Create descriptors map that doesn't contain the legacy path (simulating
descriptor == null case)
+ Map<ColumnPath, ColumnDescriptor> descriptorsMap = new HashMap<>();
+
+ copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+
+ // When: Processing columns with legacy paths
+ boolean legacyConversionAttempted =
copyBase.testLegacyConversionLogic(legacyPath, descriptorsMap);
+
+ // Then: Legacy conversion should be skipped
+ assertEquals(false, legacyConversionAttempted, "Legacy conversion should
be skipped when schema evolution is disabled");
+ }
+
+ @Test
+ public void testSchemaEvolutionEnabled_AllowsLegacyConversion() throws
Exception {
+ // Given: Schema evolution is enabled
+ copyBase.setSchemaEvolutionEnabled(true);
+
+ // Setup a column that would trigger legacy conversion
+ ColumnChunkMetaData legacyColumn = mock(ColumnChunkMetaData.class);
+ ColumnPath legacyPath =
ColumnPath.fromDotString("testArray.bag.array_element");
+ when(legacyColumn.getPath()).thenReturn(legacyPath);
+
+ List<ColumnChunkMetaData> columnsInOrder = Arrays.asList(legacyColumn);
+ when(blockMetaData.getColumns()).thenReturn(columnsInOrder);
+
+ // Create descriptors map that doesn't contain the legacy path
+ Map<ColumnPath, ColumnDescriptor> descriptorsMap = new HashMap<>();
+
+ copyBase.setupForTesting(reader, blockMetaData, columnsInOrder);
+
+ // When: Processing columns with legacy paths
+ boolean legacyConversionAttempted =
copyBase.testLegacyConversionLogic(legacyPath, descriptorsMap);
+
+ // Then: Legacy conversion should be attempted
+ assertEquals(true, legacyConversionAttempted, "Legacy conversion should be
attempted when schema evolution is enabled");
+ }
+
+ /**
+ * Testable subclass that exposes internal methods and provides test setup.
+ */
+ private static class TestableHoodieParquetBinaryCopyBase extends
HoodieParquetBinaryCopyBase {
+ private CompressionConverter.TransParquetFileReader testReader;
+ private BlockMetaData testBlock;
+ private List<ColumnChunkMetaData> testColumnsInOrder;
+
+ public TestableHoodieParquetBinaryCopyBase(Configuration conf) {
+ super(conf);
+ }
+
+ public void setupForTesting(CompressionConverter.TransParquetFileReader
reader,
+ BlockMetaData block, List<ColumnChunkMetaData>
columnsInOrder) {
+ this.testReader = reader;
+ this.testBlock = block;
+ this.testColumnsInOrder = columnsInOrder;
+ }
+
+ public void testProcessMissedColumns() throws Exception {
+ // Simulate the missed columns processing logic
+ ParquetMetadata meta = testReader.getFooter();
+ ColumnChunkMetaData columnChunkMetaData = testColumnsInOrder.get(0);
+ EncodingStats encodingStats = columnChunkMetaData.getEncodingStats();
+
+ List<ColumnDescriptor> missedColumns = missedColumns(requiredSchema,
meta.getFileMetaData().getSchema())
+ .stream()
+ .collect(Collectors.toList());
+
+ // If schema evolution is disabled and there are missing columns, throw
an exception
+ if (!schemaEvolutionEnabled && !missedColumns.isEmpty()) {
+ String missingColumnsStr = missedColumns.stream()
+ .map(c -> String.join(".", c.getPath()))
+ .collect(Collectors.joining(", "));
+ throw new HoodieException("Schema evolution is disabled but found
missing columns in input file: "
+ + missingColumnsStr + ". All input files must have the same schema
when schema evolution is disabled.");
+ }
+
+ for (ColumnDescriptor descriptor : missedColumns) {
+ addNullColumn(descriptor, 100L, encodingStats, null, requiredSchema,
CompressionCodecName.SNAPPY);
+ }
+ }
+
+ public boolean testLegacyConversionLogic(ColumnPath columnPath,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) {
+ ColumnDescriptor descriptor = descriptorsMap.get(columnPath);
+
+ // resolve the conflict schema between avro parquet write support and
spark native parquet write support
+ // Only attempt legacy conversion if schema evolution is enabled
+ if (descriptor == null && schemaEvolutionEnabled) {
+ String[] path = columnPath.toArray();
+ path = Arrays.copyOf(path, path.length);
+ if (convertLegacy3LevelArray(path) || convertLegacyMap(path)) {
+ return true; // Legacy conversion was attempted
+ }
+ }
+ return false; // Legacy conversion was not attempted
+ }
+
+ private List<ColumnDescriptor> missedColumns(MessageType requiredSchema,
MessageType fileSchema) {
+ return requiredSchema.getColumns().stream()
+ .filter(col -> !fileSchema.containsPath(col.getPath()))
+ .collect(Collectors.toList());
+ }
+
+ // Mock implementation to avoid complex ParquetFileWriter setup
+ protected void addNullColumn(ColumnDescriptor descriptor, long
totalChunkValues, EncodingStats encodingStats,
+ Object writer, MessageType schema,
CompressionCodecName newCodecName) {
+ // Mock implementation - do nothing
+ }
+
+ @Override
+ protected Map<String, String> finalizeMetadata() {
+ return new HashMap<>();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
index 20a62978a4f3..9d85f371f008 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/parquet/io/TestHoodieParquetFileBinaryCopier.java
@@ -70,7 +70,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
@@ -125,7 +124,7 @@ public class TestHoodieParquetFileBinaryCopier {
.map(StoragePath::new)
.collect(Collectors.toList());
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema, true);
writer.close();
verify(schema, CompressionCodecName.GZIP);
}
@@ -143,7 +142,7 @@ public class TestHoodieParquetFileBinaryCopier {
.map(StoragePath::new)
.collect(Collectors.toList());
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema, true);
writer.close();
verify(schema, CompressionCodecName.ZSTD);
}
@@ -171,7 +170,7 @@ public class TestHoodieParquetFileBinaryCopier {
.map(StoragePath::new)
.collect(Collectors.toList());
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema1, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema1, true);
writer.close();
verify(schema1, CompressionCodecName.UNCOMPRESSED);
}
@@ -201,7 +200,7 @@ public class TestHoodieParquetFileBinaryCopier {
writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, true);
writer.close();
List<ColumnDescriptor> columns = inputSchema.getColumns();
@@ -271,7 +270,7 @@ public class TestHoodieParquetFileBinaryCopier {
writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, true);
writer.close();
List<ColumnDescriptor> columns = inputSchema.getColumns();
@@ -313,7 +312,7 @@ public class TestHoodieParquetFileBinaryCopier {
writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, true);
writer.close();
List<ColumnDescriptor> columns = inputSchema.getColumns();
@@ -388,7 +387,7 @@ public class TestHoodieParquetFileBinaryCopier {
writer = parquetFileBinaryCopier(requiredSchema, "GZIP");
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
requiredSchema, true);
writer.close();
List<ColumnDescriptor> columns = inputSchema.getColumns();
Assertions.assertEquals(5, columns.size());
@@ -420,7 +419,7 @@ public class TestHoodieParquetFileBinaryCopier {
.map(StoragePath::new)
.collect(Collectors.toList());
StoragePath outputPath = new StoragePath(outputFile);
- writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema, new Properties());
+ writer.binaryCopy(inputPaths, Collections.singletonList(outputPath),
schema, true);
writer.close();
verify(schema, CompressionCodecName.GZIP);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
index 696e9bba6b6f..fec133b647f4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringBinaryCopyStrategy.scala
@@ -47,7 +47,8 @@ class TestClusteringBinaryCopyStrategy extends
HoodieSparkProcedureTestBase {
"hoodie.sql.bulk.insert.enable" -> "true",
"hoodie.sql.insert.mode" -> "non-strict",
"hoodie.combine.before.insert" -> "false",
- "hoodie.parquet.small.file.limit" -> "-1"
+ "hoodie.parquet.small.file.limit" -> "-1",
+
"hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable" -> "true"
)
case "insert" =>
Map(
@@ -55,7 +56,8 @@ class TestClusteringBinaryCopyStrategy extends
HoodieSparkProcedureTestBase {
"hoodie.sql.insert.mode" -> "non-strict",
"hoodie.combine.before.insert" -> "false",
"spark.hadoop.parquet.avro.write-old-list-structure" -> "false",
- "hoodie.parquet.small.file.limit" -> "-1"
+ "hoodie.parquet.small.file.limit" -> "-1",
+
"hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable" -> "true"
)
}
withSQLConf(conf.toSeq: _*) {