This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 192c3247fd57 feat(schema): Migrate log reader and partitioners to take
HoodieSchema (#17548)
192c3247fd57 is described below
commit 192c3247fd5783fe58c2241d0fc18e9d3e6b2f03
Author: Tim Brown <[email protected]>
AuthorDate: Sat Dec 13 00:36:21 2025 -0500
feat(schema): Migrate log reader and partitioners to take HoodieSchema
(#17548)
---
.../hudi/cli/commands/ArchivedCommitsCommand.java | 5 +-
.../apache/hudi/cli/commands/ExportCommand.java | 3 +-
.../hudi/cli/commands/HoodieLogFileCommand.java | 4 +-
.../cli/commands/TestHoodieLogFileCommand.java | 2 +-
.../utils/LegacyArchivedMetaEntryReader.java | 3 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 4 +-
.../MultipleSparkJobExecutionStrategy.java | 10 +-
.../SparkSingleFileSortExecutionStrategy.java | 5 +-
.../SparkSortAndSizeExecutionStrategy.java | 4 +-
.../RDDCustomColumnsSortPartitioner.java | 13 +-
.../bulkinsert/RDDSpatialCurveSortPartitioner.java | 15 ++-
.../functional/TestHoodieBackedTableMetadata.java | 2 +-
.../table/log/AbstractHoodieLogRecordScanner.java | 7 +-
.../table/log/BaseHoodieLogRecordReader.java | 4 +-
.../table/log/HoodieCDCLogRecordIterator.java | 6 +-
.../hudi/common/table/log/HoodieLogFileReader.java | 21 ++--
.../hudi/common/table/log/HoodieLogFormat.java | 6 +-
.../common/table/log/HoodieLogFormatReader.java | 6 +-
.../table/log/HoodieLogFormatReverseReader.java | 135 ---------------------
.../table/log/HoodieMergedLogRecordScanner.java | 18 ++-
.../table/log/HoodieUnMergedLogRecordScanner.java | 9 +-
.../table/log/block/HoodieAvroDataBlock.java | 17 ++-
.../versioning/v1/ArchivedTimelineLoaderV1.java | 3 +-
.../org/apache/hudi/common/util/SortUtils.java | 7 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 2 +-
.../common/functional/TestHoodieLogFormat.java | 130 ++++++++++----------
.../table/timeline/TestArchivedTimelineV1.java | 3 +-
.../common/testutils/HoodieCommonTestHarness.java | 9 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 2 +-
.../realtime/RealtimeCompactedRecordReader.java | 2 +-
.../realtime/RealtimeUnmergedRecordReader.java | 3 +-
.../reader/DFSHoodieDatasetInputReader.java | 2 +-
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 2 +-
.../procedures/ExportInstantsProcedure.scala | 3 +-
.../ShowHoodieLogFileMetadataProcedure.scala | 3 +-
.../ShowHoodieLogFileRecordsProcedure.scala | 7 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 2 +-
.../TestBulkInsertInternalPartitioner.java | 4 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 4 +-
.../java/org/apache/hudi/io/TestMergeHandle.java | 2 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 4 +-
.../hudi/functional/TestMORDataSourceStorage.scala | 2 +-
.../hudi/functional/cdc/HoodieCDCTestBase.scala | 2 +-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 4 +-
.../dml/others/TestPartialUpdateForMergeInto.scala | 11 +-
.../utilities/HoodieMetadataTableValidator.java | 2 +-
46 files changed, 192 insertions(+), 322 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 3f309326f357..c451db21dfa9 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -29,6 +29,7 @@ import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -116,7 +117,7 @@ public class ArchivedCommitsCommand {
for (StoragePathInfo pathInfo : pathInfoList) {
// read the archived file
try (Reader reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(pathInfo.getPath()),
- HoodieArchivedMetaEntry.getClassSchema())) {
+
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
while (reader.hasNext()) {
@@ -190,7 +191,7 @@ public class ArchivedCommitsCommand {
for (StoragePathInfo pathInfo : pathInfoList) {
// read the archived file
try (HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(HoodieStorageUtils.getStorage(basePath,
HoodieCLI.conf),
- new HoodieLogFile(pathInfo.getPath()),
HoodieArchivedMetaEntry.getClassSchema())) {
+ new HoodieLogFile(pathInfo.getPath()),
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
while (reader.hasNext()) {
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index f9db200d99bb..8692efb8f6ba 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -27,6 +27,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -130,7 +131,7 @@ public class ExportCommand {
for (StoragePathInfo pathInfo : pathInfoList) {
// read the archived file
- try (Reader reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(pathInfo.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
+ try (Reader reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(pathInfo.getPath()),
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
// read the avro blocks
while (reader.hasNext() && copyCount++ < limit) {
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index a469ce23d00f..ffafaed6ce60 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -116,7 +116,7 @@ public class HoodieLogFileCommand {
} else {
fileName = path.getName();
}
- Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
path);
+ HoodieSchema writerSchema =
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage,
path));
try (Reader reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(path), writerSchema)) {
// read the avro blocks
@@ -265,7 +265,7 @@ public class HoodieLogFileCommand {
Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
client.getStorage(), new StoragePath(logFile));
try (HoodieLogFormat.Reader reader =
- HoodieLogFormat.newReader(storage, new HoodieLogFile(new
StoragePath(logFile)), writerSchema)) {
+ HoodieLogFormat.newReader(storage, new HoodieLogFile(new
StoragePath(logFile)), HoodieSchema.fromAvroSchema(writerSchema))) {
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index fe4ea1c8ffe1..e30c0d5ccc84 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -238,7 +238,7 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
.withStorage(storage)
.withBasePath(tablePath)
.withLogFilePaths(logFilePaths)
- .withReaderSchema(schema.toAvroSchema())
+ .withReaderSchema(schema)
.withLatestInstantTime(INSTANT_TIME)
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index 6e81266ba93a..1b445fa734da 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.client.timeline.ActiveActionWithDetails;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -268,7 +269,7 @@ public class LegacyArchivedMetaEntryReader {
reader = HoodieLogFormat.newReader(
metaClient.getStorage(),
new HoodieLogFile(pathInfo.getPath()),
- HoodieArchivedMetaEntry.getClassSchema());
+
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()));
} catch (IOException ioe) {
throw new HoodieIOException(
"Error initializing the reader for archived log: " +
pathInfo.getPath(), ioe);
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 6fdb5b3b4757..bc3a6867dbe3 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -902,7 +902,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
@@ -2886,7 +2886,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index d34dd06bbeb1..6d30eaf2b7ca 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -179,12 +179,12 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
final Map<String, String> extraMetadata);
protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String,
String> strategyParams,
- Schema
schema) {
+ HoodieSchema
schema) {
return getPartitioner(strategyParams, schema, true);
}
protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>
getRDDPartitioner(Map<String, String> strategyParams,
-
Schema schema) {
+
HoodieSchema schema) {
return getPartitioner(strategyParams, schema, false);
}
@@ -195,7 +195,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
* @param schema Schema of the data including metadata fields.
*/
private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String>
strategyParams,
- Schema schema,
+ HoodieSchema schema,
boolean
isRowPartitioner) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
@@ -209,11 +209,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
return isRowPartitioner
? new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext)
getEngineContext(), orderByColumns, layoutOptStrategy,
- getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
HoodieAvroUtils.addMetadataFields(schema), recordType);
+ getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
HoodieSchemaUtils.addMetadataFields(schema), recordType);
case LINEAR:
return isRowPartitioner
? new RowCustomColumnsSortPartitioner(orderByColumns,
getWriteConfig())
- : new RDDCustomColumnsSortPartitioner(orderByColumns,
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
+ : new RDDCustomColumnsSortPartitioner(orderByColumns,
HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig());
default:
throw new UnsupportedOperationException(String.format("Layout
optimization strategy '%s' is not supported", layoutOptStrategy));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index b3f640c646ba..c3592d30beae 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -79,7 +79,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
// Since clustering will write to single file group using
HoodieUnboundedCreateHandle, set max file size to a large value.
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(Long.MAX_VALUE));
- BulkInsertPartitioner<Dataset<Row>> partitioner =
getRowPartitioner(strategyParams, schema.toAvroSchema());
+ BulkInsertPartitioner<Dataset<Row>> partitioner =
getRowPartitioner(strategyParams, schema);
Dataset<Row> repartitionedRecords =
partitioner.repartitionRecords(inputRecords, numOutputGroups);
return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords,
instantTime, getHoodieTable(), newConfig,
@@ -107,6 +107,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(Long.MAX_VALUE));
return (HoodieData<WriteStatus>)
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(), newConfig,
- false, getRDDPartitioner(strategyParams, schema), true,
numOutputGroups, new
SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(),
shouldPreserveHoodieMetadata));
+ false, getRDDPartitioner(strategyParams,
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups,
+ new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(),
shouldPreserveHoodieMetadata));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 6023c36f0f5c..000915daee4a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -71,7 +71,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
- BulkInsertPartitioner<Dataset<Row>> partitioner =
getRowPartitioner(strategyParams, schema.toAvroSchema());
+ BulkInsertPartitioner<Dataset<Row>> partitioner =
getRowPartitioner(strategyParams, schema);
Dataset<Row> repartitionedRecords =
partitioner.repartitionRecords(inputRecords, numOutputGroups);
return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords,
instantTime, getHoodieTable(), newConfig,
@@ -96,6 +96,6 @@ public class SparkSortAndSizeExecutionStrategy<T>
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
return (HoodieData<WriteStatus>)
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(),
- newConfig, false, getRDDPartitioner(strategyParams, schema), true,
numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
+ newConfig, false, getRDDPartitioner(strategyParams,
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new
CreateHandleFactory(shouldPreserveHoodieMetadata));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 4e1b7ff774d4..bb0ba6ff3770 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -19,13 +19,12 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.SortUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
-import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.HoodieUTF8StringFactory;
@@ -43,22 +42,22 @@ public class RDDCustomColumnsSortPartitioner<T>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
private final String[] sortColumnNames;
- private final SerializableSchema serializableSchema;
+ private final HoodieSchema schema;
private final boolean consistentLogicalTimestampEnabled;
private final boolean suffixRecordKey;
private final HoodieUTF8StringFactory utf8StringFactory =
SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
- this.serializableSchema = new SerializableSchema(new
Schema.Parser().parse(config.getSchema()));
+ this.schema = HoodieSchema.parse(config.getSchema());
this.sortColumnNames = getSortColumnName(config);
this.consistentLogicalTimestampEnabled =
config.isConsistentLogicalTimestampEnabled();
this.suffixRecordKey =
config.getBoolean(BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS);
}
- public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema,
HoodieWriteConfig config) {
+ public RDDCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema
schema, HoodieWriteConfig config) {
this.sortColumnNames = columnNames;
- this.serializableSchema = new SerializableSchema(schema);
+ this.schema = schema;
this.consistentLogicalTimestampEnabled =
config.isConsistentLogicalTimestampEnabled();
this.suffixRecordKey =
config.getBoolean(BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS);
}
@@ -67,7 +66,7 @@ public class RDDCustomColumnsSortPartitioner<T>
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
return records
- .sortBy(record -> SortUtils.getComparableSortColumns(record,
sortColumnNames, serializableSchema.get(), suffixRecordKey,
consistentLogicalTimestampEnabled,
+ .sortBy(record -> SortUtils.getComparableSortColumns(record,
sortColumnNames, schema, suffixRecordKey, consistentLogicalTimestampEnabled,
utf8StringFactory::wrapArrayOfObjects), true,
outputSparkPartitions);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 9840f6ee52dd..327eb8919529 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -22,17 +22,16 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkConversionUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
@@ -51,18 +50,18 @@ public class RDDSpatialCurveSortPartitioner<T>
extends SpatialCurveSortPartitionerBase<JavaRDD<HoodieRecord<T>>> {
private final transient HoodieSparkEngineContext sparkEngineContext;
- private final SerializableSchema schema;
+ private final HoodieSchema schema;
private final HoodieRecordType recordType;
public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext
sparkEngineContext,
String[] orderByColumns,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
HoodieClusteringConfig.SpatialCurveCompositionStrategyType
curveCompositionStrategyType,
- Schema schema,
+ HoodieSchema schema,
HoodieRecordType recordType) {
super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
this.sparkEngineContext = sparkEngineContext;
- this.schema = new SerializableSchema(schema);
+ this.schema = schema;
this.recordType = recordType;
}
@@ -70,7 +69,7 @@ public class RDDSpatialCurveSortPartitioner<T>
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputSparkPartitions) {
if (recordType == HoodieRecordType.AVRO) {
JavaRDD<GenericRecord> genericRecordsRDD =
- records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new
Properties()).get().getData());
+ records.map(f -> (GenericRecord)
f.toIndexedRecord(schema.toAvroSchema(), new Properties()).get().getData());
Dataset<Row> sourceDataset =
AvroConversionUtils.createDataFrame(
@@ -80,7 +79,7 @@ public class RDDSpatialCurveSortPartitioner<T>
);
Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
- return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(),
schema.get().getNamespace(), false, Option.empty())
+ return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(),
schema.getNamespace().orElse(null), false, Option.empty())
.toJavaRDD()
.map(record -> {
String key =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -90,7 +89,7 @@ public class RDDSpatialCurveSortPartitioner<T>
return hoodieRecord;
});
} else if (recordType == HoodieRecordType.SPARK) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.get());
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
Dataset<Row> sourceDataset =
SparkConversionUtils.createDataFrame(records.rdd(),
sparkEngineContext.getSqlContext().sparkSession(), structType);
Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 8ae72764c122..a35a7c3ce456 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -530,7 +530,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 33e723548031..2a44a3e02a66 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -92,7 +93,7 @@ public abstract class AbstractHoodieLogRecordScanner {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractHoodieLogRecordScanner.class);
// Reader schema for the records
- protected final Schema readerSchema;
+ protected final HoodieSchema readerSchema;
// Latest valid instant time
// Log-Blocks belonging to inflight delta-instants are filtered-out using
this high-watermark.
private final String latestInstantTime;
@@ -155,7 +156,7 @@ public abstract class AbstractHoodieLogRecordScanner {
private HoodieTimeline inflightInstantsTimeline = null;
protected AbstractHoodieLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths,
- Schema readerSchema, String
latestInstantTime,
+ HoodieSchema readerSchema, String
latestInstantTime,
boolean reverseReader, int
bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan,
Option<String>
partitionNameOverride,
@@ -867,7 +868,7 @@ public abstract class AbstractHoodieLogRecordScanner {
public abstract Builder withLogFilePaths(List<String> logFilePaths);
- public abstract Builder withReaderSchema(Schema schema);
+ public abstract Builder withReaderSchema(HoodieSchema schema);
public abstract Builder withInternalSchema(InternalSchema internalSchema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 33c0f7f340e0..3b721c481a00 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -215,7 +215,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
- readerSchema.toAvroSchema(), reverseReader, bufferSize,
shouldLookupRecords(), recordKeyField, internalSchema);
+ readerSchema, reverseReader, bufferSize, shouldLookupRecords(),
recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
@@ -377,7 +377,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
- readerSchema == null ? null : readerSchema.toAvroSchema(),
reverseReader, bufferSize, shouldLookupRecords(), recordKeyField,
internalSchema);
+ readerSchema, reverseReader, bufferSize, shouldLookupRecords(),
recordKeyField, internalSchema);
/**
* Scanning log blocks and placing the compacted blocks at the right
place require two traversals.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index 4d2417f9851e..b1ccb6019465 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -21,13 +21,13 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
@@ -41,7 +41,7 @@ public class HoodieCDCLogRecordIterator implements
ClosableIterator<IndexedRecor
private final HoodieStorage storage;
- private final Schema cdcSchema;
+ private final HoodieSchema cdcSchema;
private final Iterator<HoodieLogFile> cdcLogFileIter;
@@ -51,7 +51,7 @@ public class HoodieCDCLogRecordIterator implements
ClosableIterator<IndexedRecor
private IndexedRecord record;
- public HoodieCDCLogRecordIterator(HoodieStorage storage, HoodieLogFile[]
cdcLogFiles, Schema cdcSchema) {
+ public HoodieCDCLogRecordIterator(HoodieStorage storage, HoodieLogFile[]
cdcLogFiles, HoodieSchema cdcSchema) {
this.storage = storage;
this.cdcSchema = cdcSchema;
this.cdcLogFileIter = Arrays.stream(cdcLogFiles).iterator();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 0172731693bc..2c1d91e7b7c1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -44,7 +44,6 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +74,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
private final HoodieLogFile logFile;
private final int bufferSize;
private final byte[] magicBuffer = new byte[6];
- private final Schema readerSchema;
+ private final HoodieSchema readerSchema;
private final InternalSchema internalSchema;
private final String keyField;
private long reverseLogFilePosition;
@@ -85,21 +84,21 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
private boolean closed = false;
private final SeekableDataInputStream inputStream;
- public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
Schema readerSchema, int bufferSize) throws IOException {
+ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
HoodieSchema readerSchema, int bufferSize) throws IOException {
this(storage, logFile, readerSchema, bufferSize, false);
}
- public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
Schema readerSchema, int bufferSize,
+ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
HoodieSchema readerSchema, int bufferSize,
boolean reverseReader) throws IOException {
this(storage, logFile, readerSchema, bufferSize, reverseReader, false,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
- public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
Schema readerSchema, int bufferSize, boolean reverseReader,
+ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
HoodieSchema readerSchema, int bufferSize, boolean reverseReader,
boolean enableRecordLookups, String keyField)
throws IOException {
this(storage, logFile, readerSchema, bufferSize, reverseReader,
enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
}
- public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
Schema readerSchema, int bufferSize, boolean reverseReader,
+ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
HoodieSchema readerSchema, int bufferSize, boolean reverseReader,
boolean enableRecordLookups, String keyField,
InternalSchema internalSchema) throws IOException {
this.storage = storage;
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
@@ -190,7 +189,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
return HoodieAvroDataBlock.getBlock(content.get(), readerSchema,
internalSchema);
} else {
return new HoodieAvroDataBlock(() -> getDataInputStream(storage,
this.logFile, bufferSize), content, true, logBlockContentLoc,
- getTargetReaderSchemaForBlock().isEmpty() ? Option.empty() :
Option.of(HoodieSchema.fromAvroSchema(getTargetReaderSchemaForBlock().get())),
header, footer, keyField);
+ getTargetReaderSchemaForBlock(), header, footer, keyField);
}
case HFILE_DATA_BLOCK:
@@ -198,14 +197,14 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
String.format("HFile block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieHFileDataBlock(
() -> getDataInputStream(storage, this.logFile, bufferSize),
content, true, logBlockContentLoc,
- Option.ofNullable(HoodieSchema.fromAvroSchema(readerSchema)),
header, footer, enableRecordLookups, logFile.getPath());
+ Option.ofNullable(readerSchema), header, footer,
enableRecordLookups, logFile.getPath());
case PARQUET_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("Parquet block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieParquetDataBlock(() -> getDataInputStream(storage,
this.logFile, bufferSize), content, true, logBlockContentLoc,
- getTargetReaderSchemaForBlock().isEmpty() ? Option.empty() :
Option.of(HoodieSchema.fromAvroSchema(getTargetReaderSchemaForBlock().get())),
header, footer, keyField);
+ getTargetReaderSchemaForBlock(), header, footer, keyField);
case DELETE_BLOCK:
return new HoodieDeleteBlock(content, () ->
getDataInputStream(storage, this.logFile, bufferSize), true,
Option.of(logBlockContentLoc), header, footer);
@@ -214,14 +213,14 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
return new HoodieCommandBlock(content, () ->
getDataInputStream(storage, this.logFile, bufferSize), true,
Option.of(logBlockContentLoc), header, footer);
case CDC_DATA_BLOCK:
- return new HoodieCDCDataBlock(() -> getDataInputStream(storage,
this.logFile, bufferSize), content, true, logBlockContentLoc,
HoodieSchema.fromAvroSchema(readerSchema), header, keyField);
+ return new HoodieCDCDataBlock(() -> getDataInputStream(storage,
this.logFile, bufferSize), content, true, logBlockContentLoc, readerSchema,
header, keyField);
default:
throw new HoodieNotSupportedException("Unsupported Block " +
blockType);
}
}
- private Option<Schema> getTargetReaderSchemaForBlock() {
+ private Option<HoodieSchema> getTargetReaderSchemaForBlock() {
// we should use write schema to read log file,
// since when we have done some DDL operation, the readerSchema maybe
different from writeSchema, avro reader will throw exception.
// eg: origin writeSchema is: "a String, b double" then we add a new
column now the readerSchema will be: "a string, c int, b double". it's wrong to
use readerSchema to read old log file.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index edc5cd845d91..bfe17c7c5bb5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
@@ -28,7 +29,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -297,12 +297,12 @@ public interface HoodieLogFormat {
return new WriterBuilder();
}
- static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile
logFile, Schema readerSchema)
+ static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile
logFile, HoodieSchema readerSchema)
throws IOException {
return new HoodieLogFileReader(storage, logFile, readerSchema,
HoodieLogFileReader.DEFAULT_BUFFER_SIZE);
}
- static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile
logFile, Schema readerSchema, boolean reverseReader) throws IOException {
+ static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile
logFile, HoodieSchema readerSchema, boolean reverseReader) throws IOException {
return new HoodieLogFileReader(storage, logFile, readerSchema,
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, reverseReader);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 0d3c99a732d5..77c3e78fcc32 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -19,12 +19,12 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +39,7 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
private final List<HoodieLogFile> logFiles;
private HoodieLogFileReader currentReader;
private final HoodieStorage storage;
- private final Schema readerSchema;
+ private final HoodieSchema readerSchema;
private final InternalSchema internalSchema;
private final String recordKeyField;
private final boolean enableInlineReading;
@@ -47,7 +47,7 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieLogFormatReader.class);
- HoodieLogFormatReader(HoodieStorage storage, List<HoodieLogFile> logFiles,
Schema readerSchema,
+ HoodieLogFormatReader(HoodieStorage storage, List<HoodieLogFile> logFiles,
HoodieSchema readerSchema,
boolean reverseLogReader, int bufferSize, boolean
enableRecordLookups,
String recordKeyField, InternalSchema internalSchema)
throws IOException {
this.logFiles = logFiles;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReverseReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReverseReader.java
deleted file mode 100644
index b746eabd7324..000000000000
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReverseReader.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.table.log;
-
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.storage.HoodieStorage;
-
-import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A log format reader by reading log files and blocks in reserve order.
- * This reader assumes that each log file only has one log block.
- */
-public class HoodieLogFormatReverseReader implements HoodieLogFormat.Reader {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieLogFormatReader.class);
- private final List<HoodieLogFile> logFiles;
- // Readers for previously scanned log-files that are still open
- private final List<HoodieLogFileReader> prevReadersInOpenState;
- private HoodieLogFileReader currentReader;
- private final HoodieStorage storage;
- private final Schema readerSchema;
- private InternalSchema internalSchema =
InternalSchema.getEmptyInternalSchema();
- private final boolean reverseLogReader;
- private final String recordKeyField;
- private final boolean enableInlineReading;
- private final int bufferSize;
- private int logFilePos = -1;
-
- HoodieLogFormatReverseReader(HoodieStorage storage, List<HoodieLogFile>
logFiles, Schema readerSchema,
- boolean reverseLogReader, int bufferSize,
boolean enableRecordLookups,
- String recordKeyField, InternalSchema
internalSchema) throws IOException {
- this.logFiles = logFiles;
- this.storage = storage;
- this.readerSchema = readerSchema;
- this.reverseLogReader = reverseLogReader;
- this.bufferSize = bufferSize;
- this.prevReadersInOpenState = new ArrayList<>();
- this.recordKeyField = recordKeyField;
- this.enableInlineReading = enableRecordLookups;
- this.internalSchema =
- internalSchema == null ? InternalSchema.getEmptyInternalSchema() :
internalSchema;
- logFilePos = logFiles.size() - 1;
- if (logFilePos >= 0) {
- HoodieLogFile nextLogFile = logFiles.get(logFilePos);
- logFilePos--;
- this.currentReader = new HoodieLogFileReader(storage, nextLogFile,
readerSchema, bufferSize, false,
- enableRecordLookups, recordKeyField, internalSchema);
- }
- }
-
- @Override
- public void close() throws IOException {
- for (HoodieLogFileReader reader : prevReadersInOpenState) {
- reader.close();
- }
-
- prevReadersInOpenState.clear();
-
- if (currentReader != null) {
- currentReader.close();
- }
- }
-
- @Override
- public boolean hasNext() {
- if (currentReader == null) {
- return false;
- } else if (currentReader.hasNext()) {
- return true;
- } else if (logFilePos >= 0) {
- try {
- HoodieLogFile nextLogFile = logFiles.get(logFilePos);
- logFilePos--;
- this.prevReadersInOpenState.add(currentReader);
- this.currentReader = new HoodieLogFileReader(storage, nextLogFile,
readerSchema, bufferSize, false,
- enableInlineReading, recordKeyField, internalSchema);
- } catch (IOException io) {
- throw new HoodieIOException("unable to initialize read with log file
", io);
- }
- LOG.info("Moving to the next reader for logfile {}",
currentReader.getLogFile());
- return hasNext();
- }
- return false;
- }
-
- @Override
- public HoodieLogBlock next() {
- return currentReader.next();
- }
-
- @Override
- public HoodieLogFile getLogFile() {
- return currentReader.getLogFile();
- }
-
- @Override
- public void remove() {
- }
-
- @Override
- public boolean hasPrev() {
- return this.currentReader.hasPrev();
- }
-
- @Override
- public HoodieLogBlock prev() throws IOException {
- return this.currentReader.prev();
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index ef937241135a..6460ee0fa86f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,7 +47,6 @@ import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +100,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
private final DeleteContext deleteContext;
@SuppressWarnings("unchecked")
- protected HoodieMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
+ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, HoodieSchema readerSchema,
String latestInstantTime, Long
maxMemorySizeInBytes,
boolean reverseReader, int
bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange,
@@ -121,13 +120,12 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(readerSchema), diskMapType, new
DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled,
getClass().getSimpleName());
+ new HoodieRecordSizeEstimator(readerSchema.toAvroSchema()),
diskMapType, new DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled,
getClass().getSimpleName());
this.scannedPrefixes = new HashSet<>();
this.allowInflightInstants = allowInflightInstants;
this.orderingFields =
ConfigUtils.getOrderingFields(this.hoodieTableMetaClient.getTableConfig().getProps());
TypedProperties mergeProps =
ConfigUtils.getMergeProps(getPayloadProps(),
this.hoodieTableMetaClient.getTableConfig());
- HoodieSchema readerHoodieSchema =
HoodieSchema.fromAvroSchema(readerSchema);
- this.deleteContext = new DeleteContext(mergeProps,
readerHoodieSchema).withReaderSchema(readerHoodieSchema);
+ this.deleteContext = new DeleteContext(mergeProps,
readerSchema).withReaderSchema(readerSchema);
} catch (IOException e) {
throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
}
@@ -260,9 +258,9 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
if (prevRecord != null) {
// Merge and store the combined record
RecordContext recordContext =
AvroRecordContext.getFieldAccessorInstance();
- BufferedRecord<T> prevBufferedRecord =
BufferedRecords.fromHoodieRecord(prevRecord,
HoodieSchema.fromAvroSchema(readerSchema),
+ BufferedRecord<T> prevBufferedRecord =
BufferedRecords.fromHoodieRecord(prevRecord, readerSchema,
recordContext, this.getPayloadProps(), orderingFields,
deleteContext);
- BufferedRecord<T> newBufferedRecord =
BufferedRecords.fromHoodieRecord(newRecord,
HoodieSchema.fromAvroSchema(readerSchema),
+ BufferedRecord<T> newBufferedRecord =
BufferedRecords.fromHoodieRecord(newRecord, readerSchema,
recordContext, this.getPayloadProps(), orderingFields,
deleteContext);
BufferedRecord<T> combinedRecord =
recordMerger.merge(prevBufferedRecord, newBufferedRecord, recordContext,
this.getPayloadProps());
// If pre-combine returns existing record, no need to update it
@@ -293,7 +291,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
// should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
// For same ordering values, uses the natural order(arrival time
semantics).
- Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps(), orderingFields);
+ Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema.toAvroSchema(),
this.hoodieTableMetaClient.getTableConfig().getProps(), orderingFields);
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
@@ -343,7 +341,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
private HoodieStorage storage;
private String basePath;
private List<String> logFilePaths;
- private Schema readerSchema;
+ private HoodieSchema readerSchema;
private InternalSchema internalSchema =
InternalSchema.getEmptyInternalSchema();
private String latestInstantTime;
private boolean reverseReader;
@@ -393,7 +391,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
}
@Override
- public Builder withReaderSchema(Schema schema) {
+ public Builder withReaderSchema(HoodieSchema schema) {
this.readerSchema = schema;
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 7159cfda5799..941d11ab94b2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -32,8 +33,6 @@ import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
-
import java.util.List;
import java.util.stream.Collectors;
@@ -45,7 +44,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
private final LogRecordScannerCallback callback;
private final RecordDeletionCallback recordDeletionCallback;
- private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
+ private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, HoodieSchema readerSchema,
String latestInstantTime, boolean
reverseReader, int bufferSize,
LogRecordScannerCallback callback,
RecordDeletionCallback recordDeletionCallback,
Option<InstantRange> instantRange,
InternalSchema internalSchema,
@@ -118,7 +117,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
private HoodieStorage storage;
private String basePath;
private List<String> logFilePaths;
- private Schema readerSchema;
+ private HoodieSchema readerSchema;
private InternalSchema internalSchema;
private String latestInstantTime;
private boolean reverseReader;
@@ -154,7 +153,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
return this;
}
- public Builder withReaderSchema(Schema schema) {
+ public Builder withReaderSchema(HoodieSchema schema) {
this.readerSchema = schema;
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 6efc9b60c8d0..26b8cbde6c79 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -412,11 +412,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
* HoodieLogFormat V1.
*/
@Deprecated
- public HoodieAvroDataBlock(List<HoodieRecord> records, Schema schema) {
+ public HoodieAvroDataBlock(List<HoodieRecord> records, HoodieSchema schema) {
super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA,
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
- public static HoodieAvroDataBlock getBlock(byte[] content, Schema
readerSchema) throws IOException {
+ public static HoodieAvroDataBlock getBlock(byte[] content, HoodieSchema
readerSchema) throws IOException {
return getBlock(content, readerSchema,
InternalSchema.getEmptyInternalSchema());
}
@@ -425,7 +425,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
* HoodieLogFormat V1.
*/
@Deprecated
- public static HoodieAvroDataBlock getBlock(byte[] content, Schema
readerSchema, InternalSchema internalSchema) throws IOException {
+ public static HoodieAvroDataBlock getBlock(byte[] content, HoodieSchema
readerSchema, InternalSchema internalSchema) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new
DataInputStream(new ByteArrayInputStream(content)));
@@ -435,15 +435,12 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
dis.readFully(compressedSchema, 0, schemaLength);
Schema writerSchema = new
Schema.Parser().parse(decompress(compressedSchema));
- if (readerSchema == null) {
- readerSchema = writerSchema;
+ if (readerSchema == null || !internalSchema.isEmptySchema()) {
+ readerSchema = HoodieSchema.fromAvroSchema(writerSchema);
}
- if (!internalSchema.isEmptySchema()) {
- readerSchema = writerSchema;
- }
-
- GenericDatumReader<IndexedRecord> reader = new
GenericDatumReader<>(writerSchema, readerSchema);
+ Schema readerAvroSchema = readerSchema.toAvroSchema();
+ GenericDatumReader<IndexedRecord> reader = new
GenericDatumReader<>(writerSchema, readerAvroSchema);
// 2. Get the total records
int totalRecords = dis.readInt();
List<HoodieRecord> records = new ArrayList<>(totalRecords);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
index 5e109ba64801..5f97ac4f3f5e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -115,7 +116,7 @@ public class ArchivedTimelineLoaderV1 implements
ArchivedTimelineLoader {
}
// Read the archived file
try (HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(metaClient.getStorage(),
- new HoodieLogFile(fs.getPath()),
HoodieArchivedMetaEntry.getClassSchema())) {
+ new HoodieLogFile(fs.getPath()),
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
int instantsInPreviousFile = instantsInRange.size();
// Read the avro blocks
while (reader.hasNext() && (!hasLimit || loadedCount.get() <
limit.get())) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
index 159abe61c2ae..c043f7a2fef0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.avro.Schema;
@@ -90,13 +91,13 @@ public class SortUtils {
public static FlatLists.ComparableList<Comparable<HoodieRecord>>
getComparableSortColumns(
HoodieRecord record,
String[] sortColumnNames,
- Schema schema,
+ HoodieSchema schema,
boolean suffixRecordKey,
boolean consistentLogicalTimestampEnabled,
Function<Object[], Object[]> wrapUTF8StringFunc
) {
if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) {
- Object[] columnValues = record.getColumnValues(schema, sortColumnNames,
consistentLogicalTimestampEnabled);
+ Object[] columnValues = record.getColumnValues(schema.toAvroSchema(),
sortColumnNames, consistentLogicalTimestampEnabled);
if (suffixRecordKey) {
return FlatLists.ofComparableArray(wrapUTF8StringFunc.apply(
prependPartitionPathAndSuffixRecordKey(record.getPartitionPath(),
record.getRecordKey(), columnValues)));
@@ -105,7 +106,7 @@ public class SortUtils {
} else if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) {
return FlatLists.ofComparableArray(wrapUTF8StringFunc.apply(
HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(
- record, sortColumnNames, schema, suffixRecordKey,
consistentLogicalTimestampEnabled
+ record, sortColumnNames, schema.toAvroSchema(), suffixRecordKey,
consistentLogicalTimestampEnabled
)));
}
throw new IllegalArgumentException("Invalid recordType" +
record.getRecordType());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 17ea108514a4..4070008d8d51 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -509,7 +509,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
throw new HoodieIOException("Fail to call getFileStatus", e);
}
}).toArray(HoodieLogFile[]::new);
- this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles,
cdcSchema.toAvroSchema());
+ this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles,
cdcSchema);
}
private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 34b0a3ec4f2b..7f71303c629e 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.SerializableIndexedRecord;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -72,7 +73,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -447,7 +447,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
writer.close();
- Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "We wrote a block, we should be able to read
it");
HoodieLogBlock nextBlock = reader.next();
assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next
block should be a data block");
@@ -494,7 +494,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
writer.close();
- Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema().toAvroSchema(), true);
+ Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema(), true);
assertTrue(reader.hasNext(), "We wrote a block, we should be able to read
it");
HoodieLogBlock nextBlock = reader.next();
assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next
block should be a data block");
@@ -572,7 +572,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
Reader reader =
- HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
HoodieLogBlock nextBlock = reader.next();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
@@ -655,7 +655,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
writer.close();
- Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
cdcSchema.toAvroSchema());
+ Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
cdcSchema);
assertTrue(reader.hasNext());
HoodieLogBlock block = reader.next();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) block;
@@ -715,7 +715,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean preTableVersion8)
throws IOException, URISyntaxException, InterruptedException {
// Generate 4 delta-log files w/ random records
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
400);
@@ -749,7 +749,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) {
- Object data = record.toIndexedRecord(schema,
CollectionUtils.emptyProps()).get().getData();
+ Object data = record.toIndexedRecord(schema.toAvroSchema(),
CollectionUtils.emptyProps()).get().getData();
if (data instanceof SerializableIndexedRecord) {
scannedRecords.add(((SerializableIndexedRecord) data).getData());
} else {
@@ -771,7 +771,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Generate 3 delta-log files w/ random records
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
300);
@@ -826,7 +826,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (HoodieRecord record : scanner) {
scannedHoodieRecords.add(record);
scannedAvroRecords.add((IndexedRecord) ((SerializableIndexedRecord)
- ((HoodieAvroRecord)
record).getData().getInsertValue(schema).get()).getData());
+ ((HoodieAvroRecord)
record).getData().getInsertValue(schema.toAvroSchema()).get()).getData());
}
assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
@@ -859,7 +859,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Generate 3 delta-log files w/ random records
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
300);
@@ -911,7 +911,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (HoodieRecord record : scanner) {
scannedHoodieRecords.add(record);
scannedAvroRecords.add((IndexedRecord) ((SerializableIndexedRecord)
- ((HoodieAvroRecord)
record).getData().getInsertValue(schema).get()).getData());
+ ((HoodieAvroRecord)
record).getData().getInsertValue(schema.toAvroSchema()).get()).getData());
}
assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
@@ -960,7 +960,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
logFile = appendValidBlock(logFile.getPath(), "test-fileId1", "100", 10);
// First round of reads - we should be able to read the first block and
then EOF
- Reader reader = HoodieLogFormat.newReader(storage, logFile,
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ Reader reader = HoodieLogFormat.newReader(storage, logFile,
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -991,7 +991,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
logFile = appendValidBlock(logFile.getPath(), "test-fileId1", "100", 100);
// Second round of reads - we should be able to read the first and last
block
- reader = HoodieLogFormat.newReader(storage, logFile,
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ reader = HoodieLogFormat.newReader(storage, logFile,
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -1047,7 +1047,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
logFile = appendValidBlock(logFile.getPath(), "test-fileId1", "100", 10);
// First round of reads - we should be able to read the first block and
then EOF
- Reader reader = HoodieLogFormat.newReader(storage, logFile,
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ Reader reader = HoodieLogFormat.newReader(storage, logFile,
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -1130,7 +1130,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
appendValidBlock(writer.getLogFile().getPath(), "test-fileid1", "100", 10);
// Read data and corrupt block
- Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -1151,7 +1151,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1163,7 +1163,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write 1
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1174,7 +1174,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write 2
List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
@@ -1196,7 +1196,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1207,7 +1207,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1237,7 +1237,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
writer.appendBlock(dataBlock);
@@ -1260,7 +1260,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1271,7 +1271,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1307,7 +1307,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
@@ -1331,7 +1331,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1342,7 +1342,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1353,7 +1353,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
@@ -1405,7 +1405,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
scanner.forEach(s -> {
try {
- if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema).isPresent()) {
+ if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
emptyPayloads.add(true);
}
} catch (IOException io) {
@@ -1449,7 +1449,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
final List<Boolean> newEmptyPayloads = new ArrayList<>();
scanner.forEach(s -> {
try {
- if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema).isPresent()) {
+ if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
newEmptyPayloads.add(true);
}
} catch (IOException io) {
@@ -1474,7 +1474,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
String fileId = "test-fileid111";
Writer writer =
@@ -1486,7 +1486,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1497,7 +1497,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> allRecordsInserted = records2.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
dataBlock = getDataBlock(HoodieLogBlockType.AVRO_DATA_BLOCK, records2,
header);
writer.appendBlock(dataBlock);
allRecordsInserted.addAll(copyOfRecords1);
@@ -1561,7 +1561,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
final List<Boolean> newEmptyPayloads = new ArrayList<>();
scanner.forEach(s -> {
try {
- if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema).isPresent()) {
+ if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
newEmptyPayloads.add(true);
}
} catch (IOException io) {
@@ -1584,7 +1584,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
public void
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType
diskMapType,
boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1595,7 +1595,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1606,7 +1606,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
@@ -1689,7 +1689,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
scanner.forEach(s -> readKeys.add(s.getRecordKey()));
scanner.forEach(s -> {
try {
- if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema).isPresent()) {
+ if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
emptyPayloadKeys.add(s.getRecordKey());
}
} catch (IOException io) {
@@ -1718,7 +1718,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in
same batch)
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1729,7 +1729,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -1788,7 +1788,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in
same batch)
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1799,7 +1799,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -1841,7 +1841,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1879,7 +1879,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1890,7 +1890,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -1934,7 +1934,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
isCompressionEnabled,
boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1959,7 +1959,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<IndexedRecord> records1 =
testUtil.generateHoodieTestRecords(0, recordKeyList, "0000/00/00",
"100");
List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1970,7 +1970,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())).collect(Collectors.toList());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
@@ -2031,7 +2031,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
scanner.forEach(s -> {
try {
- if (!((HoodieRecordPayload) s.getData()).getInsertValue(schema, new
Properties()).isPresent()) {
+ if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema.toAvroSchema(), new
Properties()).isPresent()) {
emptyPayloads.add(true);
} else {
recordKeys.add(s.getKey().getRecordKey());
@@ -2058,7 +2058,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testAvroLogRecordReaderWithRollbackOlderBlocks()
throws IOException, URISyntaxException, InterruptedException {
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -2119,7 +2119,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -2214,7 +2214,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// CRPB implies a corrupt block and CB implies a compacted block.
// Write a 3 Data blocks with same InstantTime (written in same batch)
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -2225,7 +2225,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
Set<String> recordKeysOfFirstTwoBatches = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -2238,7 +2238,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write 2nd data block
List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
recordKeysOfFirstTwoBatches.addAll(records2.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
@@ -2252,7 +2252,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
Set<String> recordKeysOfFirstThreeBatches = new
HashSet<>(recordKeysOfFirstTwoBatches);
recordKeysOfFirstThreeBatches.addAll(records3.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema.toAvroSchema())
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
@@ -2415,7 +2415,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
boolean
enableOptimizedLogBlocksScan) {
try {
// Write one Data block with same InstantTime (written in same batch)
- Schema schema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+ HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 101);
List<IndexedRecord> records2 = new ArrayList<>(records);
@@ -2551,7 +2551,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(),
storage.getPathInfo(writer.getLogFile().getPath()).getLength());
- try (HoodieLogFileReader reader = new HoodieLogFileReader(storage,
logFile, SchemaTestUtil.getSimpleSchema().toAvroSchema(), BUFFER_SIZE, true)) {
+ try (HoodieLogFileReader reader = new HoodieLogFileReader(storage,
logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) {
assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock prevBlock = reader.prev();
@@ -2636,7 +2636,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(),
storage.getPathInfo(writer.getLogFile().getPath()).getLength());
- try (HoodieLogFileReader reader = new HoodieLogFileReader(storage,
logFile, schema.toAvroSchema(), BUFFER_SIZE, true)) {
+ try (HoodieLogFileReader reader = new HoodieLogFileReader(storage,
logFile, schema, BUFFER_SIZE, true)) {
assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
@@ -2678,7 +2678,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(),
storage.getPathInfo(writer.getLogFile().getPath()).getLength());
try (HoodieLogFileReader reader =
- new HoodieLogFileReader(storage, logFile,
SchemaTestUtil.getSimpleSchema().toAvroSchema(), BUFFER_SIZE, true)) {
+ new HoodieLogFileReader(storage, logFile,
SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) {
assertTrue(reader.hasPrev(), "Third block should be available");
reader.moveToPrev();
@@ -2710,11 +2710,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<IndexedRecord> recordsCopy =
convertAvroToSerializableIndexedRecords(new ArrayList<>(records));
assertEquals(100, records.size());
assertEquals(100, recordsCopy.size());
- HoodieAvroDataBlock dataBlock = new
HoodieAvroDataBlock(records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
schema.toAvroSchema());
+ HoodieAvroDataBlock dataBlock = new
HoodieAvroDataBlock(records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
schema);
byte[] content = dataBlock.getBytes(schema.toAvroSchema());
assertTrue(content.length > 0);
- HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content,
schema.toAvroSchema());
+ HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content, schema);
assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType());
List<IndexedRecord> readRecords = getRecords((HoodieAvroDataBlock)
logBlock);
assertEquals(readRecords.size(), recordsCopy.size());
@@ -2767,9 +2767,9 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
writer.close();
- Schema projectedSchema =
HoodieAvroUtils.generateProjectionSchema(schema.toAvroSchema(),
Collections.singletonList("name"));
+ HoodieSchema projectedSchema =
HoodieSchemaUtils.generateProjectionSchema(schema,
Collections.singletonList("name"));
- List<GenericRecord> projectedRecords =
HoodieAvroUtils.rewriteRecords(records, projectedSchema);
+ List<GenericRecord> projectedRecords =
HoodieAvroUtils.rewriteRecords(records, projectedSchema.toAvroSchema());
try (Reader reader = HoodieLogFormat.newReader(storage,
writer.getLogFile(), projectedSchema, false)) {
assertTrue(reader.hasNext(), "First block should be available");
@@ -2797,7 +2797,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
"Read records size should be equal to the written records size");
assertEquals(expectedRecords, recordsRead,
"Both records lists should be the same. (ordering guaranteed)");
- assertEquals(dataBlockRead.getSchema().toAvroSchema(), projectedSchema);
+ assertEquals(dataBlockRead.getSchema(), projectedSchema);
int bytesRead = (int) BenchmarkCounter.getBytesRead();
@@ -2926,7 +2926,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
outputStream.close();
// First round of reads - we should be able to read the first block and
then EOF
- Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+ Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
@@ -2934,7 +2934,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
return reader;
}
- private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema,
ExternalSpillableMap.DiskMapType diskMapType,
+ private void checkLogBlocksAndKeys(String latestInstantTime, HoodieSchema
schema, ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled, boolean
enableOptimizedLogBlocksScan, int expectedTotalRecords,
int expectedTotalKeys,
Option<Set<String>> expectedKeys) throws IOException {
List<String> allLogFiles =
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
index 5e37cbf08ae4..aa2b4ca4ba54 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -489,7 +490,7 @@ public class TestArchivedTimelineV1 extends
HoodieCommonTestHarness {
private void readAndValidateArchivedFile(String path, HoodieStorage storage)
throws IOException {
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(
- storage, new HoodieLogFile(path),
HoodieArchivedMetaEntry.getClassSchema())) {
+ storage, new HoodieLogFile(path),
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 55643d5b239d..fd903383343c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -327,7 +328,7 @@ public class HoodieCommonTestHarness {
protected static List<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
Schema recordSchema,
- Schema writerSchema,
+ HoodieSchema writerSchema,
List<HoodieRecord>
records,
int numFiles,
HoodieStorage storage,
@@ -336,13 +337,13 @@ public class HoodieCommonTestHarness {
String commitTime)
throws IOException, InterruptedException {
List<IndexedRecord> indexedRecords = records.stream()
- .map(record -> (IndexedRecord)
record.rewriteRecordWithNewSchema(recordSchema, props, writerSchema).getData())
+ .map(record -> (IndexedRecord)
record.rewriteRecordWithNewSchema(recordSchema, props,
writerSchema.toAvroSchema()).getData())
.collect(Collectors.toList());
return writeLogFiles(partitionPath, writerSchema, indexedRecords,
numFiles, storage, fileId, commitTime, "100");
}
protected static List<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
- Schema schema,
+ HoodieSchema schema,
List<IndexedRecord>
records,
int numFiles,
HoodieStorage storage)
@@ -351,7 +352,7 @@ public class HoodieCommonTestHarness {
}
protected static List<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
- Schema schema,
+ HoodieSchema schema,
List<IndexedRecord>
records,
int numFiles,
HoodieStorage storage,
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 0eb13c6aa1ae..6f6cf310a0d4 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -259,7 +259,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
FileSlice fileSlice2 = new FileSlice(p, instant2, fileId1);
fileSlice2.setBaseFile(baseFile1);
StoragePath storagePath2 = new
StoragePath(partitionMetadataPath.getParent(),
hoodieTestTable.getLogFileNameById(fileId1, 1));
- writeLogFiles(new StoragePath(metaClient.getBasePath(), p),
HoodieTestDataGenerator.AVRO_SCHEMA,
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
+ writeLogFiles(new StoragePath(metaClient.getBasePath(), p),
HoodieTestDataGenerator.AVRO_SCHEMA,
HoodieTestDataGenerator.HOODIE_SCHEMA_WITH_METADATA_FIELDS,
dataGen.generateInsertsForPartition(instant2, 10, p), 1,
metaClient.getStorage(), new Properties(), fileId1, instant2);
fileSlice2.addLogFile(new
HoodieLogFile(storagePath2.toUri().toString()));
partitionFileSlicePairs.add(Pair.of(p, fileSlice1));
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 6085773bf1d0..94bc8fac089f 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -103,7 +103,7 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
split.getPath().toString(), HadoopFSUtils.getStorageConf(jobConf)))
.withBasePath(split.getBasePath())
.withLogFilePaths(split.getDeltaLogPaths())
- .withReaderSchema(getLogScannerReaderSchema())
+
.withReaderSchema(HoodieSchema.fromAvroSchema(getLogScannerReaderSchema()))
.withLatestInstantTime(split.getMaxCommitTime())
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
.withReverseReader(false)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 396705c15189..171eca08e99b 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
@@ -81,7 +82,7 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader
split.getPath().toString(),
HadoopFSUtils.getStorageConf(this.jobConf)))
.withBasePath(split.getBasePath())
.withLogFilePaths(split.getDeltaLogPaths())
- .withReaderSchema(getReaderSchema())
+ .withReaderSchema(HoodieSchema.fromAvroSchema(getReaderSchema()))
.withLatestInstantTime(split.getMaxCommitTime())
.withReverseReader(false)
.withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index a2e2af8b397b..6a28d2af703c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -289,7 +289,7 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
.withLogFilePaths(
fileSlice.getLogFiles().map(l -> l.getPath().getName())
.collect(Collectors.toList()))
- .withReaderSchema(new Schema.Parser().parse(schemaStr))
+ .withReaderSchema(HoodieSchema.parse(schemaStr))
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant().get().requestedTime())
.withMaxMemorySizeInBytes(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 31a982e0910b..85ecad6002b9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -451,7 +451,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map {
cdcFile =>
new HoodieLogFile(storage.getPathInfo(new StoragePath(basePath,
cdcFile)))
}.toArray
- cdcLogRecordIterator = new HoodieCDCLogRecordIterator(storage,
cdcLogFiles, cdcHoodieSchema.toAvroSchema)
+ cdcLogRecordIterator = new HoodieCDCLogRecordIterator(storage,
cdcLogFiles, cdcHoodieSchema)
case REPLACE_COMMIT =>
if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 4ca87c42d3fb..5ba9960d1d3c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
import org.apache.hudi.common.model.HoodieLogFile
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
@@ -120,7 +121,7 @@ class ExportInstantsProcedure extends BaseProcedure with
ProcedureBuilder with L
for (fs <- statuses.asScala) {
// read the archived file
val reader = HoodieLogFormat.newReader(
- storage, new HoodieLogFile(convertToStoragePath(fs.getPath)),
HoodieArchivedMetaEntry.getClassSchema)
+ storage, new HoodieLogFile(convertToStoragePath(fs.getPath)),
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema))
// read the avro blocks
while ( {
reader.hasNext && copyCount < limit
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index a986db8254af..1ffdd0d80cad 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieLogFile
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock,
HoodieDataBlock}
@@ -73,7 +74,7 @@ class ShowHoodieLogFileMetadataProcedure extends
BaseProcedure with ProcedureBui
logFilePath => {
val statuses = storage.listDirectEntries(new StoragePath(logFilePath))
val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePath))
- val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(statuses.get(0).getPath), schema)
+ val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(statuses.get(0).getPath), HoodieSchema.fromAvroSchema(schema))
// read the avro blocks
while (reader.hasNext) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 7dbf57c1925a..db72ec23a123 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -21,6 +21,7 @@ import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMemoryConfig, Ho
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecordPayload}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.log.{HoodieLogFormat,
HoodieMergedLogRecordScanner}
import org.apache.hudi.common.table.log.block.HoodieDataBlock
@@ -68,7 +69,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log
file")
val allRecords: java.util.List[IndexedRecord] = new
java.util.ArrayList[IndexedRecord]
if (merge) {
- val schema =
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePaths.last)))
+ val schema =
HoodieSchema.fromAvroSchema(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
new StoragePath(logFilePaths.last))))
val scanner = HoodieMergedLogRecordScanner.newBuilder
.withStorage(storage)
.withBasePath(basePath)
@@ -83,7 +84,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
.build
scanner.asScala.foreach(hoodieRecord => {
- val record =
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
+ val record =
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema.toAvroSchema).get()
if (allRecords.size() < limit) {
allRecords.add(record)
}
@@ -92,7 +93,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
logFilePath => {
val schema =
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePath)))
- val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePath), schema)
+ val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePath), HoodieSchema.fromAvroSchema(schema))
while (reader.hasNext) {
val block = reader.next()
block match {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 52ef94ab85fa..514cec157265 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -608,7 +608,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
WriteClientTestUtils.startCommitWithTime(client, commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
- BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new
RDDCustomColumnsSortPartitioner(new String[] {"rider"},
HoodieTestDataGenerator.AVRO_SCHEMA, config);
+ BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new
RDDCustomColumnsSortPartitioner(new String[] {"rider"},
HoodieTestDataGenerator.HOODIE_SCHEMA, config);
List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1,
commitTime1, Option.of(partitioner)).collect();
assertNoWriteErrors(statuses);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 69553b8f319e..f000a7a14232 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -229,9 +229,9 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 =
generateTripleTestRecordsForBulkInsert(jsc);
- testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, config),
+ testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.HOODIE_SCHEMA, config),
records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator),
true);
- testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, config),
+ testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.HOODIE_SCHEMA, config),
records2, true, true, true,
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator),
true);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 44703b04d6ca..8d51c21fbcf7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -1405,7 +1405,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
@@ -3955,7 +3955,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
index ed961747692b..0279bbe44d88 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -261,7 +261,7 @@ public class TestMergeHandle extends BaseTestHandle {
String cdcFilePath = metaClient.getBasePath().toString() + "/" +
writeStatus.getStat().getCdcStats().keySet().stream().findFirst().get();
HoodieSchema cdcSchema =
schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY,
HOODIE_SCHEMA);
int recordKeyFieldIndex = cdcSchema.getField("record_key").get().pos();
- try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(storage,
new HoodieLogFile(cdcFilePath), cdcSchema.toAvroSchema())) {
+ try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(storage,
new HoodieLogFile(cdcFilePath), cdcSchema)) {
while (reader.hasNext()) {
HoodieLogBlock logBlock = reader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 8b524b1ffb10..93ae04e5df29 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -95,7 +95,7 @@ import static
org.apache.hudi.common.model.HoodieWriteStat.NULL_COMMIT;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
-import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.HOODIE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -607,7 +607,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
.filter(path -> FSUtils.isLogFile(path.getName()))
.forEach(logFilePath -> {
try {
- HoodieLogFileReader reader = new
HoodieLogFileReader(hoodieStorage(), new HoodieLogFile(logFilePath),
AVRO_SCHEMA, 10000, false,
+ HoodieLogFileReader reader = new
HoodieLogFileReader(hoodieStorage(), new HoodieLogFile(logFilePath),
HOODIE_SCHEMA, 10000, false,
false, "_row_key", null);
Map<HoodieLogBlock.HeaderMetadataType, String> headers =
Collections.emptyMap();
while (reader.hasNext()) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 9aa5767b7b98..a06c3533d9b0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -352,7 +352,7 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
shouldContainRecordPosition: Boolean,
logFileList: List[HoodieLogFile],
shouldBaseFileInstantTimeMatch:
Boolean): Unit = {
- val schema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ val schema = new TableSchemaResolver(metaClient).getTableSchema
val fsv = FileSystemViewManager.createInMemoryFileSystemView(
context(), metaClient, HoodieMetadataConfig.newBuilder().build())
logFileList.foreach(filename => {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index a17f68b2479c..d8a2a99ee328 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -120,7 +120,7 @@ abstract class HoodieCDCTestBase extends
HoodieSparkClientTestBase {
protected def getCDCBlocks(relativeLogFile: String, cdcSchema:
HoodieSchema): List[HoodieDataBlock] = {
val logFile = new HoodieLogFile(
metaClient.getStorage.getPathInfo(new
StoragePath(metaClient.getBasePath, relativeLogFile)))
- val reader = HoodieLogFormat.newReader(storage, logFile,
cdcSchema.toAvroSchema)
+ val reader = HoodieLogFormat.newReader(storage, logFile, cdcSchema)
val blocks = scala.collection.mutable.ListBuffer.empty[HoodieDataBlock]
while(reader.hasNext) {
blocks.asJava.add(reader.next().asInstanceOf[HoodieDataBlock])
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 40dd8f4c8a44..5a93fcef6e2c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -454,11 +454,11 @@ object HoodieSparkSqlTestBase {
val logFilePathList: java.util.List[String] =
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
Collections.sort(logFilePathList)
var deleteLogBlockFound = false
- val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ val schema = new TableSchemaResolver(metaClient).getTableSchema
for (i <- 0 until logFilePathList.size()) {
val logReader = new HoodieLogFileReader(
metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
- avroSchema, 1024 * 1024, false, false,
+ schema, 1024 * 1024, false, false,
"id", null)
assertTrue(logReader.hasNext)
val logBlock = logReader.next()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
index 735542585235..fc08271bc9b1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.{DataSourceReadOptions,
DataSourceWriteOptions}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig,
RecordMergeMode}
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
import org.apache.hudi.common.table.log.HoodieLogFileReader
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
@@ -743,11 +744,11 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
val logFilePathList: List[String] =
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
Collections.sort(logFilePathList)
- val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ val schema = new TableSchemaResolver(metaClient).getTableSchema
for (i <- 0 until expectedNumLogFile) {
val logReader = new HoodieLogFileReader(
metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
- avroSchema, 1024 * 1024, false, false,
+ schema, 1024 * 1024, false, false,
"id", null)
assertTrue(logReader.hasNext)
val logBlockHeader = logReader.next().getLogBlockHeader
@@ -758,9 +759,9 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
} else {
assertFalse(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
}
- val actualSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
- val expectedSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
- avroSchema, changedFields(i).asJava), false)
+ val actualSchema =
HoodieSchema.parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+ val expectedSchema =
HoodieSchemaUtils.addMetadataFields(HoodieSchemaUtils.generateProjectionSchema(
+ schema, changedFields(i).asJava), false)
assertEquals(expectedSchema, actualSchema)
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5eb4b7e2158e..24de148ca1c5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1662,7 +1662,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
continue;
}
reader =
- HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePathStr), readerSchema.toAvroSchema(), false);
+ HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePathStr), readerSchema, false);
// read the avro blocks
if (reader.hasNext()) {
HoodieLogBlock block = reader.next();