This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8bf44c01b56 [HUDI-6821] Support multiple base file formats in Hudi
table (#9761)
8bf44c01b56 is described below
commit 8bf44c01b56dd3afe5323dc7566971cee2e46d50
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Oct 26 09:27:02 2023 +0530
[HUDI-6821] Support multiple base file formats in Hudi table (#9761)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 11 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 3 +-
.../java/org/apache/hudi/table/HoodieTable.java | 10 +-
.../table/action/bootstrap/BootstrapUtils.java | 9 +-
...sistentHashingBucketClusteringPlanStrategy.java | 4 +-
.../rollback/ListingBasedRollbackStrategy.java | 6 +-
.../table/upgrade/ZeroToOneUpgradeHandler.java | 7 +-
.../io/storage/row/HoodieRowDataCreateHandle.java | 4 +-
.../client/TestHoodieJavaWriteClientInsert.java | 4 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 5 -
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 3 +-
.../commit/TestJavaCopyOnWriteActionExecutor.java | 4 +-
.../testutils/HoodieJavaClientTestHarness.java | 4 +
.../SparkBootstrapCommitActionExecutor.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 14 +-
.../table/action/bootstrap/TestBootstrapUtils.java | 12 +-
.../commit/TestCopyOnWriteActionExecutor.java | 5 +-
.../TestHoodieSparkMergeOnReadTableRollback.java | 2 +-
.../hudi/testutils/HoodieClientTestBase.java | 5 +
.../testutils/HoodieSparkClientTestHarness.java | 5 -
.../apache/hudi/common/model/HoodieFileFormat.java | 9 +
.../hudi/common/table/HoodieTableConfig.java | 10 +
.../hudi/common/table/HoodieTableMetaClient.java | 19 +-
.../org/apache/hudi/common/util/BaseFileUtils.java | 5 -
.../org/apache/hudi/common/fs/TestFSUtils.java | 27 ++
.../hudi/common/testutils/HoodieTestTable.java | 3 +-
.../org/apache/hudi/BaseFileOnlyRelation.scala | 4 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 52 ++--
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 107 +++++++-
...tils.scala => HoodieSparkFileFormatUtils.scala} | 35 +--
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 9 +-
.../hudi/MergeOnReadIncrementalRelation.scala | 4 +-
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 92 -------
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 4 +-
.../datasources/HoodieMultipleBaseFileFormat.scala | 278 +++++++++++++++++++++
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 2 +-
.../RepairMigratePartitionMetaProcedure.scala | 2 +-
.../org/apache/hudi/functional/TestBootstrap.java | 8 +-
.../apache/hudi/functional/TestOrcBootstrap.java | 8 +-
.../apache/hudi/testutils/DataSourceTestUtils.java | 20 +-
.../TestHoodieMultipleBaseFileFormat.scala | 123 +++++++++
.../datasources/Spark32NestedSchemaPruning.scala | 3 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 10 +-
43 files changed, 712 insertions(+), 241 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index cc3876338cc..5ae7ab25fbd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -219,7 +219,7 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "the timeline as an immutable log relying only on atomic writes
for object storage.");
public static final ConfigProperty<HoodieFileFormat> BASE_FILE_FORMAT =
ConfigProperty
- .key("hoodie.table.base.file.format")
+ .key("hoodie.base.file.format")
.defaultValue(HoodieFileFormat.PARQUET)
.withValidValues(HoodieFileFormat.PARQUET.name(),
HoodieFileFormat.ORC.name(), HoodieFileFormat.HFILE.name())
.withAlternatives("hoodie.table.ro.file.format")
@@ -1198,6 +1198,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(BASE_PATH);
}
+ public HoodieFileFormat getBaseFileFormat() {
+ return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT));
+ }
+
public HoodieRecordMerger getRecordMerger() {
List<String> mergers =
StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream()
.map(String::trim)
@@ -2705,6 +2709,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withBaseFileFormat(String baseFileFormat) {
+ writeConfig.setValue(BASE_FILE_FORMAT,
HoodieFileFormat.valueOf(baseFileFormat).name());
+ return this;
+ }
+
public Builder withSchema(String schemaStr) {
writeConfig.setValue(AVRO_SCHEMA_STRING, schemaStr);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 8c76e322b09..9d1bb6d511e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -122,8 +122,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime,
writeToken, fileId,
-
hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
+ return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime,
writeToken, fileId, hoodieTable.getBaseFileExtension()));
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8cc1dcf924c..36a5e6de21a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -217,7 +217,7 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context,
String instantTime, K keys);
/**
- * Delete records from Hoodie table based on {@link HoodieKey} and {@link
HoodieRecordLocation} specified in
+ * Delete records from Hoodie table based on {@link HoodieKey} and {@link
org.apache.hudi.common.model.HoodieRecordLocation} specified in
* preppedRecords.
*
* @param context {@link HoodieEngineContext}.
@@ -874,13 +874,13 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
}
public HoodieFileFormat getBaseFileFormat() {
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ if (tableConfig.isMultipleBaseFileFormatsEnabled() &&
config.contains(HoodieWriteConfig.BASE_FILE_FORMAT)) {
+ return config.getBaseFileFormat();
+ }
return metaClient.getTableConfig().getBaseFileFormat();
}
- public HoodieFileFormat getLogFileFormat() {
- return metaClient.getTableConfig().getLogFileFormat();
- }
-
public Option<HoodieFileFormat> getPartitionMetafileFormat() {
return metaClient.getTableConfig().getPartitionMetafileFormat();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
index 3e9e6b42a61..05f71454ed0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
@@ -45,16 +46,16 @@ public class BootstrapUtils {
/**
* Returns leaf folders with files under a path.
- * @param metaClient Hoodie table metadata client
+ * @param baseFileFormat Hoodie base file format
* @param fs File System
* @param context JHoodieEngineContext
* @return list of partition paths with files under them.
* @throws IOException
*/
- public static List<Pair<String, List<HoodieFileStatus>>>
getAllLeafFoldersWithFiles(HoodieTableMetaClient metaClient,
- FileSystem fs, String basePathStr, HoodieEngineContext context) throws
IOException {
+ public static List<Pair<String, List<HoodieFileStatus>>>
getAllLeafFoldersWithFiles(HoodieFileFormat baseFileFormat,
+
FileSystem fs, String basePathStr, HoodieEngineContext context) throws
IOException {
final Path basePath = new Path(basePathStr);
- final String baseFileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ final String baseFileExtension = baseFileFormat.getFileExtension();
final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
final Map<String, List<HoodieFileStatus>> partitionToFiles = new
HashMap<>();
PathFilter filePathFilter = getFilePathFilter(baseFileExtension);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index af3c00d3d8e..27fea59fa9f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -329,12 +329,12 @@ public abstract class
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
}
private long getSplitSize() {
- HoodieFileFormat format =
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
+ HoodieFileFormat format = getHoodieTable().getBaseFileFormat();
return (long) (getWriteConfig().getMaxFileSize(format) *
getWriteConfig().getBucketSplitThreshold());
}
private long getMergeSize() {
- HoodieFileFormat format =
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
+ HoodieFileFormat format = getHoodieTable().getBaseFileFormat();
return (long) (getWriteConfig().getMaxFileSize(format) *
getWriteConfig().getBucketMergeThreshold());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 74e60b35bd0..2b383c1d246 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -95,7 +95,7 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan: " + config.getTableName());
HoodieTableType tableType = table.getMetaClient().getTableType();
- String baseFileExtension = getBaseFileExtension(metaClient);
+ String baseFileExtension = table.getBaseFileExtension();
Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(metaClient, instantToRollback);
Boolean isCommitMetadataCompleted =
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
AtomicBoolean isCompaction = new AtomicBoolean(false);
@@ -191,10 +191,6 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return
metaClient.getFs().listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
}
- private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
- return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- }
-
@NotNull
private List<HoodieRollbackRequest> getHoodieRollbackRequests(String
partitionPath, FileStatus[] filesToDeletedStatus) {
return Arrays.stream(filesToDeletedStatus)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 831e11efae7..772afe71b02 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,8 +18,6 @@
package org.apache.hudi.table.upgrade;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.ConfigProperty;
@@ -39,6 +37,9 @@ import
org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -137,6 +138,6 @@ public class ZeroToOneUpgradeHandler implements
UpgradeHandler {
String deltaInstant = FSUtils.getDeltaCommitTimeFromLogPath(logPath);
String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
- return FSUtils.makeBaseFileName(deltaInstant, writeToken, fileId,
table.getBaseFileFormat().getFileExtension());
+ return FSUtils.makeBaseFileName(deltaInstant, writeToken, fileId,
table.getBaseFileExtension());
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 6cff94068d6..475d0efc582 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -195,9 +194,8 @@ public class HoodieRowDataCreateHandle implements
Serializable {
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime,
getWriteToken(), fileId,
- tableConfig.getBaseFileFormat().getFileExtension()));
+ table.getBaseFileExtension()));
}
/**
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
index 02c407ba02d..ea13939ad2e 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
@@ -148,7 +148,7 @@ public class TestHoodieJavaWriteClientInsert extends
HoodieJavaClientTestHarness
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+ BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
// Get some records belong to the same partition (2021/09/11)
String insertRecordStr1 = "{\"_row_key\":\"1\","
@@ -222,7 +222,7 @@ public class TestHoodieJavaWriteClientInsert extends
HoodieJavaClientTestHarness
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+ BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
String partitionPath = "2021/09/11";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{partitionPath});
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 1e8f5149d37..06446ae9138 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -39,7 +39,6 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
@@ -2763,10 +2762,6 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(),
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
- // Metadata table is HFile format
- assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(),
HoodieFileFormat.HFILE,
- "Metadata Table base file format should be HFile");
-
// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as
that function filters all directory
// in the .hoodie folder.
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 3330c5c7eed..a591134517f 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -62,7 +62,6 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.MarkerUtils;
@@ -1021,7 +1020,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords,
List<WriteStatus> allStatus, List<GenericRecord> records) {
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
-
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf,
filePath));
+
records.addAll(getFileUtilsInstance(metaClient).readAvroRecords(hadoopConf,
filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index a3a233cb743..bda362931c7 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -129,7 +129,7 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+ BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
String partitionPath = "2016/01/31";
@@ -476,7 +476,7 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+ BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
String partitionPath = "2022/04/09";
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 7a373f093c0..09687e73a89 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -1046,4 +1046,8 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
}
return builder;
}
+
+ public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient
metaClient) {
+ return
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 92bee7ab141..884a7a6ab44 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -266,7 +266,7 @@ public class SparkBootstrapCommitActionExecutor<T>
*/
private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>>
listAndProcessSourcePartitions() throws IOException {
List<Pair<String, List<HoodieFileStatus>>> folders =
BootstrapUtils.getAllLeafFoldersWithFiles(
- table.getMetaClient(), bootstrapSourceFileSystem,
config.getBootstrapSourceBasePath(), context);
+ table.getBaseFileFormat(), bootstrapSourceFileSystem,
config.getBootstrapSourceBasePath(), context);
LOG.info("Fetching Bootstrap Schema !!");
HoodieBootstrapSchemaProvider sourceSchemaProvider = new
HoodieSparkBootstrapSchemaProvider(config);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 8f13e0cea48..44105a41983 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1200,7 +1200,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+ BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
@@ -1313,7 +1313,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+ BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
@@ -1410,9 +1410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
- assertEquals(100,
- BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, new
Path(basePath, statuses.get(0).getStat().getPath()))
- .size(), "file should contain 100 records");
+ assertEquals(100, getFileUtilsInstance(metaClient).readRowKeys(hadoopConf,
new Path(basePath, statuses.get(0).getStat().getPath())).size(), "file should
contain 100 records");
// Delete 20 among 100 inserted
testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
@@ -2091,7 +2089,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords,
List<WriteStatus> allStatus, List<GenericRecord> records) {
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
-
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(),
filePath));
+
records.addAll(getFileUtilsInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(),
filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
@@ -2180,10 +2178,10 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(expectedRecords,
- BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf,
newFile).size(),
+ getFileUtilsInstance(metaClient).readRowKeys(hadoopConf,
newFile).size(),
"file should contain 110 records");
- List<GenericRecord> records =
BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, newFile);
+ List<GenericRecord> records =
getFileUtilsInstance(metaClient).readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(keys.contains(recordKey), "key expected to be part of " +
instantTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
index 83a6caecd19..cda4fa38d40 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
@@ -67,18 +67,14 @@ public class TestBootstrapUtils extends
HoodieClientTestBase {
}
});
- List<Pair<String, List<HoodieFileStatus>>> collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
+ List<Pair<String, List<HoodieFileStatus>>> collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
metaClient.getFs(), basePath, context);
assertEquals(3, collected.size());
- collected.stream().forEach(k -> {
- assertEquals(2, k.getRight().size());
- });
+ collected.forEach(k -> assertEquals(2, k.getRight().size()));
// Simulate reading from un-partitioned dataset
- collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(), basePath + "/" + folders.get(0), context);
+ collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
metaClient.getFs(), basePath + "/" + folders.get(0), context);
assertEquals(1, collected.size());
- collected.stream().forEach(k -> {
- assertEquals(2, k.getRight().size());
- });
+ collected.forEach(k -> assertEquals(2, k.getRight().size()));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 24b66911613..4574b34393d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -244,8 +244,7 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestBase implemen
// Check whether the record has been updated
Path updatedFilePath = allFiles[0].getPath();
- BloomFilter updatedFilter =
-
BaseFileUtils.getInstance(metaClient).readBloomFilterFromMetadata(hadoopConf,
updatedFilePath);
+ BloomFilter updatedFilter =
getFileUtilsInstance(metaClient).readBloomFilterFromMetadata(hadoopConf,
updatedFilePath);
for (HoodieRecord record : records) {
// No change to the _row_key
assertTrue(updatedFilter.mightContain(record.getRecordKey()));
@@ -542,7 +541,7 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestBase implemen
Option<Path> metafilePath =
HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath);
if (partitionMetafileUseBaseFormat) {
// Extension should be the same as the data file format of the table
-
assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension()));
+
assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileExtension()));
} else {
// No extension as it is in properties file format
assertTrue(metafilePath.get().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 01cfcd047b4..92f2b2e1438 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -950,7 +950,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
return records;
}
- private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient
metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException {
+ private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient
metaClient, HoodieWriteConfig cfg, long numLogFiles) {
// Do a compaction
String instantTime =
client.scheduleCompaction(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client.compact(instantTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index c4a150e7f8f..39c77de3f26 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
@@ -622,4 +623,8 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
public HoodieCleanStat getCleanStat(List<HoodieCleanStat>
hoodieCleanStatsTwo, String partitionPath) {
return hoodieCleanStatsTwo.stream().filter(e ->
e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
}
+
+ public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient
metaClient) {
+ return
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index 54b4972880f..2a83baa018c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -602,10 +601,6 @@ public abstract class HoodieSparkClientTestHarness extends
HoodieWriterClientTes
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(),
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
- // Metadata table is HFile format
- assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(),
HoodieFileFormat.HFILE,
- "Metadata Table base file format should be HFile");
-
// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that
function filters all directory
// in the .hoodie folder.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index c8c94e5db3d..d7c25b82fad 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -64,4 +64,13 @@ public enum HoodieFileFormat {
public String getFileExtension() {
return extension;
}
+
+ public static HoodieFileFormat fromFileExtension(String extension) {
+ for (HoodieFileFormat format : HoodieFileFormat.values()) {
+ if (format.getFileExtension().equals(extension)) {
+ return format;
+ }
+ }
+ throw new IllegalArgumentException("Unknown file extension :" + extension);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 29869730367..27aaf3324ed 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -244,6 +244,12 @@ public class HoodieTableConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("When set to true, will not write the partition
columns into hudi. By default, false.");
+ public static final ConfigProperty<Boolean>
MULTIPLE_BASE_FILE_FORMATS_ENABLE = ConfigProperty
+ .key("hoodie.table.multiple.base.file.formats.enable")
+ .defaultValue(false)
+ .sinceVersion("1.0.0")
+ .withDocumentation("When set to true, the table can support reading and
writing multiple base file formats.");
+
public static final ConfigProperty<String> URL_ENCODE_PARTITIONING =
KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE =
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
@@ -747,6 +753,10 @@ public class HoodieTableConfig extends HoodieConfig {
return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
}
+ public boolean isMultipleBaseFileFormatsEnabled() {
+ return getBooleanOrDefault(MULTIPLE_BASE_FILE_FORMATS_ENABLE);
+ }
+
/**
* Read the table checksum.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index cee950592b4..2a989764120 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -158,8 +158,7 @@ public class HoodieTableMetaClient implements Serializable {
}
this.timelineLayoutVersion = layoutVersion.orElseGet(() ->
tableConfig.getTimelineLayoutVersion().get());
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
- LOG.info("Finished Loading Table of type " + tableType + "(version=" +
timelineLayoutVersion + ", baseFileFormat="
- + this.tableConfig.getBaseFileFormat() + ") from " + basePath);
+ LOG.info("Finished Loading Table of type " + tableType + "(version=" +
timelineLayoutVersion + ") from " + basePath);
if (loadActiveTimelineOnLoad) {
LOG.info("Loading Active commit timeline for " + basePath);
getActiveTimeline();
@@ -867,6 +866,7 @@ public class HoodieTableMetaClient implements Serializable {
private String metadataPartitions;
private String inflightMetadataPartitions;
private String secondaryIndexesMetadata;
+ private Boolean multipleBaseFileFormatsEnabled;
/**
* Persist the configs that is written at the first time, and should not
be changed.
@@ -1031,6 +1031,15 @@ public class HoodieTableMetaClient implements
Serializable {
return this;
}
+ public PropertyBuilder setMultipleBaseFileFormatsEnabled(Boolean
multipleBaseFileFormatsEnabled) {
+ this.multipleBaseFileFormatsEnabled = multipleBaseFileFormatsEnabled;
+ return this;
+ }
+
+ public PropertyBuilder setBaseFileFormats(String baseFileFormats) {
+ return this;
+ }
+
public PropertyBuilder set(Map<String, Object> props) {
for (ConfigProperty<String> configProperty :
HoodieTableConfig.PERSISTED_CONFIG_LIST) {
if (containsConfigProperty(props, configProperty)) {
@@ -1155,6 +1164,9 @@ public class HoodieTableMetaClient implements
Serializable {
if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES_METADATA))
{
setSecondaryIndexesMetadata(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES_METADATA));
}
+ if
(hoodieConfig.contains(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE)) {
+
setMultipleBaseFileFormatsEnabled(hoodieConfig.getBoolean(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE));
+ }
return this;
}
@@ -1263,6 +1275,9 @@ public class HoodieTableMetaClient implements
Serializable {
if (null != secondaryIndexesMetadata) {
tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES_METADATA,
secondaryIndexesMetadata);
}
+ if (null != multipleBaseFileFormatsEnabled) {
+
tableConfig.setValue(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE,
Boolean.toString(multipleBaseFileFormatsEnabled));
+ }
return tableConfig.getProps();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index be41857a38e..278729f3d78 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -65,10 +64,6 @@ public abstract class BaseFileUtils {
throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
}
- public static BaseFileUtils getInstance(HoodieTableMetaClient metaClient) {
- return getInstance(metaClient.getTableConfig().getBaseFileFormat());
- }
-
/**
* Read the rowKey list from the given data file.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index b5f4ea5726f..612929bc8a6 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -545,6 +545,33 @@ public class TestFSUtils extends HoodieCommonTestHarness {
.collect(Collectors.toSet()));
}
+ @Test
+ public void testGetFileExtension() {
+ String pathWithExtension = "/path/to/some/file/sample.parquet";
+ String pathWithoutExtension = "/path/to/some/file/sample";
+ String justFileNameWithExtension = "sample.orc";
+ String justFileNameWithoutExtension = "sample";
+
+ // file with extension
+ String result1 = FSUtils.getFileExtension(pathWithExtension);
+ assertEquals(".parquet", result1);
+
+ // file without extension
+ String result2 = FSUtils.getFileExtension(pathWithoutExtension);
+ assertEquals("", result2);
+
+ // just a file name with extension
+ String result3 = FSUtils.getFileExtension(justFileNameWithExtension);
+ assertEquals(".orc", result3);
+
+ // just a file name without extension
+ String result4 = FSUtils.getFileExtension(justFileNameWithoutExtension);
+ assertEquals("", result4);
+
+ // null input
+ assertThrows(NullPointerException.class, () ->
FSUtils.getFileExtension(null));
+ }
+
private Path getHoodieTempDir() {
return new Path(baseUri.toString(), ".hoodie/.temp");
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 81e7d993d55..202827ce0c7 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -50,7 +50,6 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -789,7 +788,7 @@ public class HoodieTestTable {
}
public FileStatus[] listAllBaseFiles() throws IOException {
- return
listAllBaseFiles(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension());
+ return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension());
}
public FileStatus[] listAllBaseFiles(String fileExtension) throws
IOException {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index f3b32b84017..65bb8881455 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -90,9 +90,7 @@ case class BaseFileOnlyRelation(override val sqlContext:
SQLContext,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): RDD[InternalRow]
= {
- val (partitionSchema, dataSchema, requiredDataSchema) =
- tryPrunePartitionColumns(tableSchema, requiredSchema)
-
+ val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index f982fb1e1c3..965340c637a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -23,8 +23,7 @@ import
org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPER
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
-import org.apache.hudi.common.model.WriteConcurrencyMode
-import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.model.{HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -118,11 +117,6 @@ class DefaultSource extends RelationProvider
DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths,
parameters)
}
- def getValidCommits(metaClient: HoodieTableMetaClient): String = {
- metaClient
-
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",")
- }
-
/**
* This DataSource API is used for writing the DataFrame at the destination.
For now, we are returning a dummy
* relation here because Spark does not really make use of the relation
returned, and just returns an empty
@@ -227,6 +221,7 @@ object DefaultSource {
val queryType = parameters(QUERY_TYPE.key)
val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
+ val isMultipleBaseFileFormatsEnabled =
metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is:
$tableType, queryType is: $queryType")
@@ -245,16 +240,24 @@ object DefaultSource {
} else if (isCdcQuery) {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
} else {
- lazy val newHudiFileFormatUtils = if
(parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
- USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths
== null || globPaths.isEmpty)
+ lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled &&
!isBootstrappedTable)
+ || (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean
+ && (globPaths == null || globPaths.isEmpty)
&& parameters.getOrElse(REALTIME_MERGE.key(),
REALTIME_MERGE.defaultValue())
- .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
- val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext,
metaClient, parameters, userSchema)
+ .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL))) {
+ val formatUtils = new HoodieSparkFileFormatUtils(sqlContext,
metaClient, parameters, userSchema)
if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
} else {
Option.empty
}
+ if (isMultipleBaseFileFormatsEnabled) {
+ if (isBootstrappedTable) {
+ throw new HoodieException(s"Multiple base file formats are not
supported for bootstrapped table")
+ }
+ resolveMultiFileFormatRelation(tableType, queryType,
fileFormatUtils.get)
+ }
+
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
@@ -265,27 +268,27 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- if (newHudiFileFormatUtils.isEmpty) {
+ if (fileFormatUtils.isEmpty) {
new MergeOnReadSnapshotRelation(sqlContext, parameters,
metaClient, globPaths, userSchema)
} else {
- newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true,
isBootstrap = false)
+ fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap
= false)
}
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters,
metaClient, userSchema)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- if (newHudiFileFormatUtils.isEmpty) {
+ if (fileFormatUtils.isEmpty) {
new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
} else {
- newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true,
isBootstrap = true)
+ fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap
= true)
}
case (_, _, true) =>
- if (newHudiFileFormatUtils.isEmpty) {
+ if (fileFormatUtils.isEmpty) {
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
} else {
- newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false,
isBootstrap = true)
+ fileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap
= true)
}
case (_, _, _) =>
@@ -332,6 +335,21 @@ object DefaultSource {
}
}
+ private def resolveMultiFileFormatRelation(tableType: HoodieTableType,
+ queryType: String,
+ fileFormatUtils:
HoodieSparkFileFormatUtils): BaseRelation = {
+ (tableType, queryType) match {
+ case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
+ (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
+ fileFormatUtils.getHadoopFsRelation(isMOR = false, isBootstrap = false)
+ case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
+ (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
+ fileFormatUtils.getHadoopFsRelation(isMOR = true, isBootstrap = false)
+ case (_, _) =>
+ throw new HoodieException(s"Multiple base file formats not supported
for query type : $queryType for tableType: $tableType")
+ }
+ }
+
private def resolveSchema(metaClient: HoodieTableMetaClient,
parameters: Map[String, String],
schema: Option[StructType]): StructType = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 9ace93ed495..c791e8417ca 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -55,7 +55,6 @@ import
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpr
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
@@ -516,6 +515,99 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
*/
def updatePrunedDataSchema(prunedSchema: StructType): Relation
+ protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
+ requiredFilters: Seq[Filter],
+ optionalFilters: Seq[Filter] = Seq.empty,
+ baseFileFormat: HoodieFileFormat =
tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = {
+ val (partitionSchema, dataSchema, requiredDataSchema) =
+ tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+ val fullSchemaReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
+ requiredDataSchema = dataSchema,
+ // This file-reader is used to read base file records, subsequently
merging them with the records
+ // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
+ // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
+ // we combine them correctly);
+ // As such only required filters could be pushed-down to such reader
+ filters = requiredFilters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
internalSchemaOpt),
+ baseFileFormat = baseFileFormat
+ )
+
+ val requiredSchemaReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
+ requiredDataSchema = requiredDataSchema,
+ // This file-reader is used to read base file records, subsequently
merging them with the records
+ // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
+ // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
+ // we combine them correctly);
+ // As such only required filters could be pushed-down to such reader
+ filters = requiredFilters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema),
+ baseFileFormat = baseFileFormat
+ )
+
+ // Check whether fields required for merging were also requested to be
fetched
+ // by the query:
+ // - In case they were, there's no optimization we could apply here (we
will have
+ // to fetch such fields)
+ // - In case they were not, we will provide 2 separate file-readers
+ // a) One which would be applied to file-groups w/ delta-logs
(merging)
+ // b) One which would be applied to file-groups w/ no delta-logs or
+ // in case query-mode is skipping merging
+ val mandatoryColumns =
mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
+ if (mandatoryColumns.forall(requestedColumns.contains)) {
+ HoodieMergeOnReadBaseFileReaders(
+ fullSchemaReader = fullSchemaReader,
+ requiredSchemaReader = requiredSchemaReader,
+ requiredSchemaReaderSkipMerging = requiredSchemaReader
+ )
+ } else {
+ val prunedRequiredSchema = {
+ val unusedMandatoryColumnNames =
mandatoryColumns.filterNot(requestedColumns.contains)
+ val prunedStructSchema =
+ StructType(requiredDataSchema.structTypeSchema.fields
+ .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
+
+ HoodieTableSchema(prunedStructSchema,
convertToAvroSchema(prunedStructSchema, tableName).toString)
+ }
+
+ val requiredSchemaReaderSkipMerging = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
+ requiredDataSchema = prunedRequiredSchema,
+ // This file-reader is only used in cases when no merging is
performed, therefore it's safe to push
+ // down these filters to the base file readers
+ filters = requiredFilters ++ optionalFilters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema),
+ baseFileFormat = baseFileFormat
+ )
+
+ HoodieMergeOnReadBaseFileReaders(
+ fullSchemaReader = fullSchemaReader,
+ requiredSchemaReader = requiredSchemaReader,
+ requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
+ )
+ }
+ }
+
/**
* Returns file-reader routine accepting [[PartitionedFile]] and returning
an [[Iterator]]
* over [[InternalRow]]
@@ -527,16 +619,15 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
- shouldAppendPartitionValuesOverride:
Option[Boolean] = None): BaseFileReader = {
- val tableBaseFileFormat = tableConfig.getBaseFileFormat
-
+ shouldAppendPartitionValuesOverride:
Option[Boolean] = None,
+ baseFileFormat: HoodieFileFormat =
tableConfig.getBaseFileFormat): BaseFileReader = {
// NOTE: PLEASE READ CAREFULLY
// Lambda returned from this method is going to be invoked on the
executor, and therefore
// we have to eagerly initialize all of the readers even though only
one specific to the type
// of the file being read will be used. This is required to avoid
serialization of the whole
// relation (containing file-index for ex) and passing it to the
executor
val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType)
=
- tableBaseFileFormat match {
+ baseFileFormat match {
case HoodieFileFormat.PARQUET =>
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
@@ -571,17 +662,17 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
(hfileReader, requiredDataSchema.structTypeSchema)
- case _ => throw new UnsupportedOperationException(s"Base file format is
not currently supported ($tableBaseFileFormat)")
+ case _ => throw new UnsupportedOperationException(s"Base file format is
not currently supported ($baseFileFormat)")
}
BaseFileReader(
read = partitionedFile => {
val filePathString =
sparkAdapter.getSparkPartitionedFileUtils.getStringPathFromPartitionedFile(partitionedFile)
val extension = FSUtils.getFileExtension(filePathString)
- if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+ if (baseFileFormat.getFileExtension.equals(extension)) {
read(partitionedFile)
} else {
- throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($tableBaseFileFormat)")
+ throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($baseFileFormat)")
}
},
schema = schema
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
similarity index 87%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
index a76d4bfc77f..e66b248e0ab 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
@@ -24,9 +24,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
-import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
-import
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -37,7 +35,7 @@ import
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.spark.sql.catalyst.analysis.Resolver
import
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
NewHoodieParquetFileFormat}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache,
HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{FileStatusCache,
HadoopFsRelation, HoodieMultipleBaseFileFormat}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
@@ -46,10 +44,10 @@ import org.apache.spark.sql.{SQLContext, SparkSession}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
-class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext,
- val metaClient: HoodieTableMetaClient,
- val optParamsInput: Map[String, String],
- private val schemaSpec:
Option[StructType]) extends SparkAdapterSupport {
+class HoodieSparkFileFormatUtils(val sqlContext: SQLContext,
+ val metaClient: HoodieTableMetaClient,
+ val optParamsInput: Map[String, String],
+ private val schemaSpec: Option[StructType])
extends SparkAdapterSupport {
protected val sparkSession: SparkSession = sqlContext.sparkSession
protected val optParams: Map[String, String] = optParamsInput.filter(kv =>
!kv._1.equals(DATA_QUERIES_ONLY.key()))
@@ -208,18 +206,27 @@ class NewHoodieParquetFileFormatUtils(val sqlContext:
SQLContext,
Seq.empty
}
fileIndex.shouldEmbedFileSlices = true
- val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
- tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap, shouldUseRecordPosition)
- val newHoodieParquetFileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
- sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap)
+
+ val fileFormat = if (fileGroupReaderEnabled) {
+ new HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap, shouldUseRecordPosition)
+ } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
+ new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR)
+ } else {
+ new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap)
+ }
+
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
- fileFormat = if (fileGroupReaderEnabled) fileGroupReaderBasedFileFormat
else newHoodieParquetFileFormat,
+ fileFormat = fileFormat,
optParams)(sparkSession)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index b2c44cc3330..e2c5ad88d7f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -169,9 +169,12 @@ object HoodieWriterUtils {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
- val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, key)
- if (null != existingValue && !resolver(existingValue, value)) {
- diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+ // Base file format can change between writes, so ignore it.
+ if (!HoodieTableConfig.BASE_FILE_FORMAT.key.equals(key)) {
+ val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, key)
+ if (null != existingValue && !resolver(existingValue, value)) {
+
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index d80ce1a9cba..4dda08c2e28 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,7 +17,7 @@
package org.apache.hudi
-import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
+import org.apache.hadoop.fs.{FileStatus, GlobPattern}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
@@ -181,7 +181,7 @@ trait HoodieIncrementalRelationTrait extends
HoodieBaseRelation {
protected lazy val commitsMetadata =
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
protected lazy val affectedFilesInCommits: Array[FileStatus] = {
- listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath),
commitsMetadata)
+ listAffectedFilesForCommits(conf, metaClient.getBasePathV2,
commitsMetadata)
}
protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 31d64d50e45..8808d73ae1a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -18,12 +18,9 @@
package org.apache.hudi
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile,
isProjectionCompatible}
-import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile,
OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.rdd.RDD
@@ -125,95 +122,6 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext:
SQLContext,
fileSplits = fileSplits)
}
- protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- requiredFilters: Seq[Filter],
- optionalFilters: Seq[Filter] =
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
- val (partitionSchema, dataSchema, requiredDataSchema) =
- tryPrunePartitionColumns(tableSchema, requiredSchema)
-
- val fullSchemaReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredDataSchema = dataSchema,
- // This file-reader is used to read base file records, subsequently
merging them with the records
- // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
- // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
- // we combine them correctly);
- // As such only required filters could be pushed-down to such reader
- filters = requiredFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(new Configuration(conf),
internalSchemaOpt)
- )
-
- val requiredSchemaReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredDataSchema = requiredDataSchema,
- // This file-reader is used to read base file records, subsequently
merging them with the records
- // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
- // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
- // we combine them correctly);
- // As such only required filters could be pushed-down to such reader
- filters = requiredFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema)
- )
-
- // Check whether fields required for merging were also requested to be
fetched
- // by the query:
- // - In case they were, there's no optimization we could apply here (we
will have
- // to fetch such fields)
- // - In case they were not, we will provide 2 separate file-readers
- // a) One which would be applied to file-groups w/ delta-logs
(merging)
- // b) One which would be applied to file-groups w/ no delta-logs or
- // in case query-mode is skipping merging
- val mandatoryColumns =
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
- if (mandatoryColumns.forall(requestedColumns.contains)) {
- HoodieMergeOnReadBaseFileReaders(
- fullSchemaReader = fullSchemaReader,
- requiredSchemaReader = requiredSchemaReader,
- requiredSchemaReaderSkipMerging = requiredSchemaReader
- )
- } else {
- val prunedRequiredSchema = {
- val unusedMandatoryColumnNames =
mandatoryColumns.filterNot(requestedColumns.contains)
- val prunedStructSchema =
- StructType(requiredDataSchema.structTypeSchema.fields
- .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
-
- HoodieTableSchema(prunedStructSchema,
convertToAvroSchema(prunedStructSchema, tableName).toString)
- }
-
- val requiredSchemaReaderSkipMerging = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredDataSchema = prunedRequiredSchema,
- // This file-reader is only used in cases when no merging is
performed, therefore it's safe to push
- // down these filters to the base file readers
- filters = requiredFilters ++ optionalFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema)
- )
-
- HoodieMergeOnReadBaseFileReaders(
- fullSchemaReader = fullSchemaReader,
- requiredSchemaReader = requiredSchemaReader,
- requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
- )
- }
- }
-
protected override def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
val convertedPartitionFilters =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 772dd27e279..01fa4f7e39b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -126,9 +126,9 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
lazy val partitionFields: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
/**
- * BaseFileFormat
+ * For multiple base file formats
*/
- lazy val baseFileFormat: String =
metaClient.getTableConfig.getBaseFileFormat.name()
+ lazy val isMultipleBaseFileFormatsEnabled: Boolean =
tableConfig.isMultipleBaseFileFormatsEnabled
/**
* Firstly try to load table schema from meta directory on filesystem.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
new file mode 100644
index 00000000000..c250a875f2b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
REALTIME_SKIP_MERGE_OPT_VAL}
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema,
HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation,
PartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport}
+import org.apache.spark.broadcast.Broadcast
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+/**
+ * File format that supports reading multiple base file formats in a table.
+ */
+class HoodieMultipleBaseFileFormat(tableState: Broadcast[HoodieTableState],
+ tableSchema: Broadcast[HoodieTableSchema],
+ tableName: String,
+ mergeType: String,
+ mandatoryFields: Seq[String],
+ isMOR: Boolean) extends FileFormat with
SparkAdapterSupport {
+ private val parquetFormat = new ParquetFileFormat()
+ private val orcFormat = new OrcFileFormat()
+
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ // This is a simple heuristic assuming all files have the same extension.
+ val fileFormat = detectFileFormat(files.head.getPath.toString)
+
+ fileFormat match {
+ case "parquet" => parquetFormat.inferSchema(sparkSession, options, files)
+ case "orc" => orcFormat.inferSchema(sparkSession, options, files)
+ case _ => throw new UnsupportedOperationException(s"File format
$fileFormat is not supported.")
+ }
+ }
+
+ override def isSplitable(sparkSession: SparkSession, options: Map[String,
String], path: Path): Boolean = {
+ false
+ }
+
+ // Used so that the planner only projects once and does not stack overflow
+ var isProjected = false
+
+ /**
+ * Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
+ * while the other side can't
+ */
+ private var supportBatchCalled = false
+ private var supportBatchResult = false
+
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ if (!supportBatchCalled) {
+ supportBatchCalled = true
+ supportBatchResult =
+ !isMOR && parquetFormat.supportBatch(sparkSession, schema) &&
orcFormat.supportBatch(sparkSession, schema)
+ }
+ supportBatchResult
+ }
+
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ throw new UnsupportedOperationException("Write operations are not
supported in this example.")
+ }
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
+ val requiredSchemaWithMandatory = if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+ // add mandatory fields to required schema
+ val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+ for (field <- mandatoryFields) {
+ if (requiredSchema.getFieldIndex(field).isEmpty) {
+ val fieldToAdd =
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+ added.append(fieldToAdd)
+ }
+ }
+ val addedFields = StructType(added.toArray)
+ StructType(requiredSchema.toArray ++ addedFields.fields)
+ } else {
+ dataSchema
+ }
+
+ val (parquetBaseFileReader, orcBaseFileReader,
preMergeParquetBaseFileReader, preMergeOrcBaseFileReader) = buildFileReaders(
+ sparkSession, dataSchema, partitionSchema, requiredSchema, filters,
options, hadoopConf, requiredSchemaWithMandatory)
+
+ val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+ (file: PartitionedFile) => {
+ val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ val fileFormat = detectFileFormat(filePath.toString)
+ file.partitionValues match {
+ case fileSliceMapping: PartitionFileSliceMapping =>
+ if (FSUtils.isLogFile(filePath)) {
+ // no base file
+ val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+ val logFiles = getLogFilesFromSlice(fileSlice)
+ val outputAvroSchema =
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
+ new LogFileIterator(logFiles, filePath.getParent,
tableSchema.value, outputSchema, outputAvroSchema,
+ tableState.value, broadcastedHadoopConf.value.value)
+ } else {
+ // We do not broadcast the slice if it has no log files
+ fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
+ case Some(fileSlice) =>
+ val hoodieBaseFile = fileSlice.getBaseFile.get()
+ val baseFileFormat =
detectFileFormat(hoodieBaseFile.getFileName)
+ val partitionValues = fileSliceMapping.getInternalRow
+ val logFiles = getLogFilesFromSlice(fileSlice)
+ if (requiredSchemaWithMandatory.isEmpty) {
+ val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+ baseFileFormat match {
+ case "parquet" => parquetBaseFileReader(baseFile)
+ case "orc" => orcBaseFileReader(baseFile)
+ case _ => throw new UnsupportedOperationException(s"Base
file format $baseFileFormat is not supported.")
+ }
+ } else {
+ if (logFiles.nonEmpty) {
+ val baseFile = createPartitionedFile(InternalRow.empty,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+ buildMergeOnReadIterator(
+ baseFileFormat match {
+ case "parquet" =>
preMergeParquetBaseFileReader(baseFile)
+ case "orc" => preMergeOrcBaseFileReader(baseFile)
+ case _ => throw new
UnsupportedOperationException(s"Base file format $baseFileFormat is not
supported.")
+ },
+ logFiles,
+ filePath.getParent,
+ requiredSchemaWithMandatory,
+ requiredSchemaWithMandatory,
+ outputSchema,
+ partitionSchema,
+ partitionValues,
+ broadcastedHadoopConf.value.value)
+ } else {
+ throw new IllegalStateException("should not be here since
file slice should not have been broadcasted since it has no log or base files")
+ }
+ }
+ case _ => fileFormat match {
+ case "parquet" => parquetBaseFileReader(file)
+ case "orc" => orcBaseFileReader(file)
+ case _ => throw new UnsupportedOperationException(s"Base file
format $fileFormat is not supported.")
+ }
+ }
+ }
+ case _ => fileFormat match {
+ case "parquet" => parquetBaseFileReader(file)
+ case "orc" => orcBaseFileReader(file)
+ case _ => throw new UnsupportedOperationException(s"Base file format
$fileFormat is not supported.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Build file readers to read individual physical files
+ */
+ protected def buildFileReaders(sparkSession: SparkSession, dataSchema:
StructType, partitionSchema: StructType,
+ requiredSchema: StructType, filters:
Seq[Filter], options: Map[String, String],
+ hadoopConf: Configuration,
requiredSchemaWithMandatory: StructType):
+ (PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow]) = {
+ val parquetBaseFileReader =
parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema,
+ filters, options, new Configuration(hadoopConf))
+ val orcBaseFileReader =
orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema,
+ filters, options, new Configuration(hadoopConf))
+
+ val preMergeParquetBaseFileReader = if (isMOR) {
+ parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
+ requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ val preMergeOrcBaseFileReader = if (isMOR) {
+ orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
+ requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ (parquetBaseFileReader, orcBaseFileReader, preMergeParquetBaseFileReader,
preMergeOrcBaseFileReader)
+ }
+
+ /**
+ * Create iterator for a file slice that has log files
+ */
+ protected def buildMergeOnReadIterator(iter: Iterator[InternalRow],
logFiles: List[HoodieLogFile],
+ partitionPath: Path, inputSchema:
StructType, requiredSchemaWithMandatory: StructType,
+ outputSchema: StructType,
partitionSchema: StructType, partitionValues: InternalRow,
+ hadoopConf: Configuration):
Iterator[InternalRow] = {
+
+ val requiredAvroSchema =
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
+ val morIterator = mergeType match {
+ case REALTIME_SKIP_MERGE_OPT_VAL => throw new
UnsupportedOperationException("Skip merge is not currently " +
+ "implemented for the New Hudi Parquet File format")
+ //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema,
tableSchema.value,
+ // requiredSchemaWithMandatory, requiredAvroSchema, tableState.value,
hadoopConf)
+ case REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
+ new RecordMergingFileIterator(logFiles, partitionPath, iter,
inputSchema, tableSchema.value,
+ requiredSchemaWithMandatory, requiredAvroSchema, tableState.value,
hadoopConf)
+ }
+ appendPartitionAndProject(morIterator, requiredSchemaWithMandatory,
partitionSchema,
+ outputSchema, partitionValues)
+ }
+
+ /**
+ * Append partition values to rows and project to output schema
+ */
+ protected def appendPartitionAndProject(iter: Iterator[InternalRow],
+ inputSchema: StructType,
+ partitionSchema: StructType,
+ to: StructType,
+ partitionValues: InternalRow):
Iterator[InternalRow] = {
+ if (partitionSchema.isEmpty) {
+ projectSchema(iter, inputSchema, to)
+ } else {
+ val unsafeProjection =
generateUnsafeProjection(StructType(inputSchema.fields ++
partitionSchema.fields), to)
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+ }
+ }
+
+ protected def projectSchema(iter: Iterator[InternalRow],
+ from: StructType,
+ to: StructType): Iterator[InternalRow] = {
+ val unsafeProjection = generateUnsafeProjection(from, to)
+ iter.map(d => unsafeProjection(d))
+ }
+
+ protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
+
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
+ }
+
+ private def detectFileFormat(filePath: String): String = {
+ // Logic to detect file format based on the filePath or its content.
+ if (filePath.endsWith(".parquet")) "parquet"
+ else if (filePath.endsWith(".orc")) "orc"
+ else ""
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index a34a6dfb052..5492d12d5fb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -456,7 +456,7 @@ trait ProvidesHoodieConfig extends Logging {
hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key,
enableHive.toString)
hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key,
props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key,
HiveSyncMode.HMS.name()))
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH,
hoodieCatalogTable.tableLocation)
- hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT,
hoodieCatalogTable.baseFileFormat)
+ hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT,
props.getString(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key,
HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue))
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_DATABASE_NAME,
hoodieCatalogTable.table.identifier.database.getOrElse("default"))
hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_TABLE_NAME,
hoodieCatalogTable.table.identifier.table)
if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index 5804d36ba09..b12c694ce56 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -74,7 +74,7 @@ class RepairMigratePartitionMetaProcedure extends
BaseProcedure with ProcedureBu
if (!dryRun) {
if (!baseFormatFile.isPresent) {
val partitionMetadata: HoodiePartitionMetadata = new
HoodiePartitionMetadata(metaClient.getFs, latestCommit,
- basePath, partition,
Option.of(metaClient.getTableConfig.getBaseFileFormat))
+ basePath, partition,
Option.of(getWriteConfig(basePath.toString).getBaseFileFormat))
partitionMetadata.trySave(0)
}
// delete it, in case we failed midway last time.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index c2afc73ebac..8df34768909 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -169,7 +169,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
} else {
df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
}
- String filePath =
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(),
+ String filePath =
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
metaClient.getFs(),
srcPath, context).stream().findAny().map(p ->
p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
HoodieAvroParquetReader parquetReader = new
HoodieAvroParquetReader(metaClient.getHadoopConf(), new Path(filePath));
@@ -273,7 +273,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
client.getTableServiceClient().rollbackFailedBootstrap();
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
- assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(), basePath, context)
+ assertEquals(0L,
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
metaClient.getFs(), basePath, context)
.stream().mapToLong(f -> f.getValue().size()).sum());
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -300,7 +300,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
String updateSPath = tmpFolder.toAbsolutePath() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords,
partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
- generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
updateSPath, context),
+ generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
metaClient.getFs(), updateSPath, context),
schema);
String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs);
@@ -373,7 +373,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
bootstrapped.registerTempTable("bootstrapped");
original.registerTempTable("original");
if (checkNumRawFiles) {
- List<HoodieFileStatus> files =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
+ List<HoodieFileStatus> files =
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
metaClient.getFs(),
bootstrapBasePath, context).stream().flatMap(x ->
x.getValue().stream()).collect(Collectors.toList());
assertEquals(files.size() * numVersions,
sqlContext.sql("select distinct _hoodie_file_name from
bootstrapped").count());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index 54857e78eb7..abbbd78d064 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -151,7 +151,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
} else {
df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath);
}
- String filePath =
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(),
+ String filePath =
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
metaClient.getFs(),
srcPath, context).stream().findAny().map(p ->
p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
Reader orcReader = OrcFile.createReader(new Path(filePath),
OrcFile.readerOptions(metaClient.getHadoopConf()));
@@ -262,7 +262,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
client.getTableServiceClient().rollbackFailedBootstrap();
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
- assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(), basePath, context)
+ assertEquals(0L,
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
metaClient.getFs(), basePath, context)
.stream().flatMap(f -> f.getValue().stream()).count());
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -289,7 +289,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords,
partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
- generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
updateSPath, context),
+ generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
metaClient.getFs(), updateSPath, context),
schema);
String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs);
@@ -361,7 +361,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
bootstrapped.registerTempTable("bootstrapped");
original.registerTempTable("original");
if (checkNumRawFiles) {
- List<HoodieFileStatus> files =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
+ List<HoodieFileStatus> files =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
metaClient.getFs(),
bootstrapBasePath, context).stream().flatMap(x ->
x.getValue().stream()).collect(Collectors.toList());
assertEquals(files.size() * numVersions,
sqlContext.sql("select distinct _hoodie_file_name from
bootstrapped").count());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 4a93245dc8d..28c8df82e8e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -18,16 +18,17 @@
package org.apache.hudi.testutils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -130,17 +131,14 @@ public class DataSourceTestUtils {
*/
public static boolean isLogFileOnly(String basePath) throws IOException {
Configuration conf = new Configuration();
- HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
- .setConf(conf).setBasePath(basePath)
- .build();
- String baseDataFormat =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
Path path = new Path(basePath);
FileSystem fs = path.getFileSystem(conf);
RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true);
while (files.hasNext()) {
LocatedFileStatus file = files.next();
- if (file.isFile()) {
- if (file.getPath().toString().endsWith(baseDataFormat)) {
+ // skip meta folder
+ if (file.isFile() &&
!file.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME +
Path.SEPARATOR)) {
+ if (FSUtils.isBaseFile(file.getPath())) {
return false;
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala
new file mode 100644
index 00000000000..8995f7ec883
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieStorageConfig
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.{DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger,
SparkDatasetMixin}
+import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Test cases on multiple base file format support for COW and MOR table types.
+ */
+class TestHoodieMultipleBaseFileFormat extends HoodieSparkClientTestBase with
SparkDatasetMixin {
+
+ var spark: SparkSession = null
+ private val log = LoggerFactory.getLogger(classOf[TestMORDataSource])
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE.key -> "true",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ )
+ val sparkOpts = Map(
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName,
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+ )
+
+ val verificationCol: String = "driver"
+ val updatedVerificationVal: String = "driver_update"
+
+ @BeforeEach override def setUp() {
+ setTableName("hoodie_test")
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @Test
+ def testMultiFileFormatForCOWTableType(): Unit = {
+ insertAndValidateSnapshot(basePath, HoodieTableType.COPY_ON_WRITE.name())
+ }
+
+ @Test
+ def testMultiFileFormatForMORTableType(): Unit = {
+ insertAndValidateSnapshot(basePath, HoodieTableType.MERGE_ON_READ.name())
+ }
+
+ def insertAndValidateSnapshot(basePath: String, tableType: String): Unit = {
+ // Insert records in Parquet format to one of the partitions.
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001",
10, DEFAULT_FIRST_PARTITION_PATH)).asScala
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ // Insert records to a new partition in ORC format.
+ val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002",
10, DEFAULT_SECOND_PARTITION_PATH)).asScala
+ val inputDF2: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key,
HoodieFileFormat.ORC.name())
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // Snapshot Read the table
+ val hudiDf = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(0, hudiDf.count())
+
+ // Update and generate new slice across partitions.
+ val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003",
10)).asScala
+ val inputDF3: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records3, 2))
+ inputDF3.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // Snapshot Read the table
+ val hudiDfAfterUpdate = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(0, hudiDfAfterUpdate.count())
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
index 7a6cb20c849..861fd43be85 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
AttributeSet, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
Project}
@@ -52,7 +53,7 @@ class Spark32NestedSchemaPruning extends Rule[LogicalPlan] {
// NOTE: This is modified to accommodate for Hudi's custom relations,
given that original
// [[NestedSchemaPruning]] rule is tightly coupled w/
[[HadoopFsRelation]]
// TODO generalize to any file-based relation
- l @ LogicalRelation(relation: HoodieBaseRelation, _, _, _))
+ l @ LogicalRelation(relation: HoodieBaseRelation, _, catalogTable:
Option[HoodieCatalogTable], _))
if relation.canPruneRelationSchema =>
prunePhysicalColumns(l.output, projects, filters, relation.dataSchema,
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 0626ac3960f..95534c5533f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -671,13 +671,9 @@ public class HoodieStreamer implements Serializable {
// This will guarantee there is no surprise with table type
checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)), "Hoodie
table is of type " + tableType + " but passed in CLI argument is " +
cfg.tableType);
- // Load base file format
- // This will guarantee there is no surprise with base file type
- String baseFileFormat =
meta.getTableConfig().getBaseFileFormat().toString();
- checkArgument(baseFileFormat.equals(cfg.baseFileFormat) ||
cfg.baseFileFormat == null,
- format("Hoodie table's base file format is of type %s but passed
in CLI argument is %s", baseFileFormat, cfg.baseFileFormat));
- cfg.baseFileFormat = baseFileFormat;
- this.cfg.baseFileFormat = baseFileFormat;
+ if (cfg.baseFileFormat == null) {
+ cfg.baseFileFormat = "PARQUET"; // default for backward
compatibility
+ }
Map<String, String> propsToValidate = new HashMap<>();
properties.get().forEach((k, v) -> propsToValidate.put(k.toString(),
v.toString()));
HoodieWriterUtils.validateTableConfig(this.sparkSession,
org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate),
meta.getTableConfig());