This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 19d29ac [HUDI-741] Added checks to validate Hoodie's schema evolution.
19d29ac is described below
commit 19d29ac7d0ac451324d25bfe85b51b1eca10bc67
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Mar 27 00:53:49 2020 -0700
[HUDI-741] Added checks to validate Hoodie's schema evolution.
HUDI specific validation of schema evolution should ensure that a newer
schema can be used for the dataset by checking that the data written using the
old schema can be read using the new schema.
Code changes:
1. Added a new config in HoodieWriteConfig to enable schema validation
check (disabled by default)
2. Moved code that reads schema from base/log files into hudi-common from
hudi-hive-sync
3. Added writerSchema to the extraMetadata of compaction commits in MOR
table. This is same as that for commits on COW table.
Testing changes:
4. Extended TestHoodieClientBase to add insertBatch API which allows
inserting a new batch of unique records into a HUDI table
5. Added a unit test to verify schema evolution for both COW and MOR tables.
6. Added unit tests for schema compatiblity checks.
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 8 +-
.../org/apache/hudi/client/HoodieWriteClient.java | 64 ++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 12 +
.../org/apache/hudi/io/HoodieAppendHandle.java | 2 -
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 6 +-
.../apache/hudi/client/TestHoodieClientBase.java | 28 ++
.../TestHoodieClientOnCopyOnWriteStorage.java | 6 +-
.../hudi/client/TestTableSchemaEvolution.java | 487 +++++++++++++++++++++
.../hudi/common/HoodieTestDataGenerator.java | 19 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 8 +
.../hudi/common/table/TableSchemaResolver.java | 360 +++++++++++++++
.../java/org/apache/hudi/hive/HiveSyncTool.java | 4 +-
.../org/apache/hudi/hive/HoodieHiveClient.java | 129 +-----
.../util/{SchemaUtil.java => HiveSchemaUtil.java} | 33 +-
.../org/apache/hudi/hive/TestHiveSyncTool.java | 18 +-
15 files changed, 1004 insertions(+), 180 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 0d67da7..db1ab16 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -36,7 +37,6 @@ import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.hive.util.SchemaUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
@@ -90,7 +90,7 @@ public class HoodieLogFileCommand implements CommandMarker {
for (String logFilePath : logFilePaths) {
FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
Schema writerSchema = new AvroSchemaConverter()
- .convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(fs,
new Path(logFilePath))));
+
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs,
new Path(logFilePath))));
Reader reader = HoodieLogFormat.newReader(fs, new
HoodieLogFile(fsStatus[0].getPath()), writerSchema);
// read the avro blocks
@@ -179,7 +179,7 @@ public class HoodieLogFileCommand implements CommandMarker {
AvroSchemaConverter converter = new AvroSchemaConverter();
// get schema from last log file
Schema readerSchema =
-
converter.convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(fs,
new Path(logFilePaths.get(logFilePaths.size() - 1)))));
+
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs,
new Path(logFilePaths.get(logFilePaths.size() - 1)))));
List<IndexedRecord> allRecords = new ArrayList<>();
@@ -202,7 +202,7 @@ public class HoodieLogFileCommand implements CommandMarker {
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
-
.convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(client.getFs(),
new Path(logFile))));
+
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
new Path(logFile))));
HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new
Path(logFile)), writerSchema);
// read the avro blocks
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 1fd34a3..c7de8df 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -19,6 +19,8 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -36,6 +38,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -50,9 +53,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
+import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCommitArchiveLog;
@@ -166,6 +171,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final
String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
+ validateSchema(table, true);
setOperationType(WriteOperationType.UPSERT);
HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
@@ -185,6 +191,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>>
preppedRecords, final String instantTime) {
HoodieTable<T> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
+ validateSchema(table, true);
setOperationType(WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime,
preppedRecords);
return postWrite(result, instantTime, table);
@@ -202,6 +209,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final
String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
+ validateSchema(table, false);
setOperationType(WriteOperationType.INSERT);
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
@@ -220,6 +228,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>>
preppedRecords, final String instantTime) {
HoodieTable<T> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
+ validateSchema(table, false);
setOperationType(WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime,
preppedRecords);
return postWrite(result, instantTime, table);
@@ -882,6 +891,8 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+
// Finalize write
finalizeWrite(table, compactionCommitTime, updateStatusMap);
@@ -919,4 +930,55 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
});
return compactionInstantTimeOpt;
}
-}
+
+ /**
+ * Ensure that the current writerSchema is compatible with the latest schema
of this dataset.
+ *
+ * When inserting/updating data, we read records using the last used schema
and convert them to the
+ * GenericRecords with writerSchema. Hence, we need to ensure that this
conversion can take place without errors.
+ *
+ * @param hoodieTable The Hoodie Table
+ * @param isUpsert If this is a check during upserts
+ * @throws HoodieUpsertException If schema check fails during upserts
+ * @throws HoodieInsertException If schema check fails during inserts
+ */
+ private void validateSchema(HoodieTable<T> hoodieTable, final boolean
isUpsert)
+ throws HoodieUpsertException, HoodieInsertException {
+
+ if (!getConfig().getAvroSchemaValidate()) {
+ // Check not required
+ return;
+ }
+
+ boolean isValid = false;
+ String errorMsg = "WriterSchema is not compatible with the schema present
in the Table";
+ Throwable internalError = null;
+ Schema tableSchema = null;
+ Schema writerSchema = null;
+ try {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(hoodieTable.getMetaClient());
+ writerSchema =
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
+ tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+ isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema);
+ } catch (Exception e) {
+ // Two error cases are possible:
+ // 1. There was no schema as no data has been inserted yet (first time
only)
+ // 2. Failure in reading the schema
+ isValid =
hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
== 0;
+ errorMsg = "Failed to read latest schema on path " + basePath;
+ internalError = e;
+ }
+
+ if (!isValid) {
+ LOG.error(errorMsg);
+ LOG.warn("WriterSchema: " + writerSchema);
+ LOG.warn("Table latest schema: " + tableSchema);
+ if (isUpsert) {
+ throw new HoodieUpsertException(errorMsg, internalError);
+ } else {
+ throw new HoodieInsertException(errorMsg, internalError);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3db17c7..5ac87da 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -52,6 +52,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String TIMELINE_LAYOUT_VERSION =
"hoodie.timeline.layout.version";
private static final String BASE_PATH_PROP = "hoodie.base.path";
private static final String AVRO_SCHEMA = "hoodie.avro.schema";
+ private static final String AVRO_SCHEMA_VALIDATE =
"hoodie.avro.schema.validate";
+ private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
private static final String DEFAULT_PARALLELISM = "1500";
private static final String INSERT_PARALLELISM =
"hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM =
"hoodie.bulkinsert.shuffle.parallelism";
@@ -131,6 +133,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
props.setProperty(AVRO_SCHEMA, schemaStr);
}
+ public boolean getAvroSchemaValidate() {
+ return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE));
+ }
+
public String getTableName() {
return props.getProperty(TABLE_NAME);
}
@@ -577,6 +583,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return this;
}
+ public Builder withAvroSchemaValidate(boolean enable) {
+ props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable));
+ return this;
+ }
+
public Builder forTable(String tableName) {
props.setProperty(TABLE_NAME, tableName);
return this;
@@ -721,6 +732,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
setDefaultOnCondition(props,
!props.containsKey(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP),
FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP,
DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED);
+ setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE),
AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
HoodieIndexConfig.newBuilder().fromProperties(props).build());
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 980d731..cc91e89 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -191,8 +191,6 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieWri
return Option.empty();
}
- // TODO (NA) - Perform a writerSchema check of current input record with the
last writerSchema on log file
- // to make sure we don't append records with older (shorter) writerSchema
than already appended
public void doAppend() {
while (recordItr.hasNext()) {
HoodieRecord record = recordItr.next();
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 0b1506d..08bb06e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -63,7 +63,7 @@ public abstract class HoodieWriteHandle<T extends
HoodieRecordPayload> extends H
this.partitionPath = partitionPath;
this.fileId = fileId;
this.originalSchema = new Schema.Parser().parse(config.getSchema());
- this.writerSchema = createHoodieWriteSchema(originalSchema);
+ this.writerSchema =
HoodieAvroUtils.createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus)
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(),
config.getWriteStatusFailureFraction());
@@ -78,10 +78,6 @@ public abstract class HoodieWriteHandle<T extends
HoodieRecordPayload> extends H
return FSUtils.makeWriteToken(getPartitionId(), getStageId(),
getAttemptId());
}
- public static Schema createHoodieWriteSchema(Schema originalSchema) {
- return HoodieAvroUtils.addMetadataFields(originalSchema);
- }
-
public Path makeNewPath(String partitionPath) {
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
try {
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index ee76ed3..4015735 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -342,6 +342,34 @@ public class TestHoodieClientBase extends
HoodieClientTestHarness {
}
/**
+ * Helper to insert another batch of records and do regular assertions on
the state after successful completion.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param client Hoodie Write Client
+ * @param newCommitTime New Commit Timestamp to be used
+ * @param initCommitTime Begin Timestamp (usually "000")
+ * @param numRecordsInThisCommit Number of records to be added in the new
commit
+ * @param writeFn Write Function to be used for insertion
+ * @param isPreppedAPI Boolean flag to indicate writeFn expects
prepped records
+ * @param assertForCommit Enable Assertion of Writes
+ * @param expRecordsInThisCommit Expected number of records in this commit
+ * @param expTotalRecords Expected number of records when scanned
+ * @param expTotalCommits Expected number of commits (including this
commit)
+ * @return RDD of write-status
+ * @throws Exception in case of error
+ */
+ JavaRDD<WriteStatus> insertBatch(HoodieWriteConfig writeConfig,
HoodieWriteClient client, String newCommitTime,
+ String initCommitTime, int
numRecordsInThisCommit,
+ Function3<JavaRDD<WriteStatus>,
HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws
Exception {
+ final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+ generateWrapRecordsFn(isPreppedAPI, writeConfig,
dataGen::generateInserts);
+
+ return writeBatch(client, newCommitTime, initCommitTime, Option.empty(),
initCommitTime, numRecordsInThisCommit,
+ recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit,
expTotalRecords, expTotalCommits);
+ }
+
+ /**
* Helper to upsert batch of records and do regular assertions on the state
after successful completion.
*
* @param writeConfig Hoodie Write Config
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 7382b7e..0899474 100644
---
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -199,15 +199,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
String recordKey = UUID.randomUUID().toString();
HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
HoodieRecord<TestRawTripPayload> recordOne =
- new HoodieRecord(keyOne,
HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime));
+ new HoodieRecord(keyOne, dataGen.generateRandomValue(keyOne,
newCommitTime));
HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
HoodieRecord recordTwo =
- new HoodieRecord(keyTwo,
HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
+ new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo,
newCommitTime));
// Same key and partition as keyTwo
HoodieRecord recordThree =
- new HoodieRecord(keyTwo,
HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
+ new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo,
newCommitTime));
JavaRDD<HoodieRecord<TestRawTripPayload>> records =
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
new file mode 100644
index 0000000..7b75d5a
--- /dev/null
+++
b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
+import static org.apache.hudi.common.HoodieTestDataGenerator.MAP_TYPE_SCHEMA;
+import static org.apache.hudi.common.HoodieTestDataGenerator.TIP_NESTED_SCHEMA;
+import static
org.apache.hudi.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static
org.apache.hudi.common.HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX;
+import static
org.apache.hudi.common.HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX;
+import static
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestTableSchemaEvolution extends TestHoodieClientBase {
+ private final String initCommitTime = "000";
+ private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+ private HoodieTestDataGenerator dataGenEvolved = new
HoodieTestDataGenerator();
+ private HoodieTestDataGenerator dataGenDevolved = new
HoodieTestDataGenerator();
+
+ public static final String EXTRA_FIELD_SCHEMA =
+ "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},";
+
+ // TRIP_EXAMPLE_SCHEMA with a new_field added
+ public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX
+ MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+ + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
+ // TRIP_EXAMPLE_SCHEMA with tip field removed
+ public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX
+ MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+ + TRIP_SCHEMA_SUFFIX;
+
+ @Before
+ public void setUp() throws Exception {
+ initResources();
+ }
+
+ @After
+ public void tearDown() {
+ cleanupSparkContexts();
+ }
+
+ @Test
+ public void testSchemaCompatibilityBasic() throws Exception {
+ assertTrue("Same schema is compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA));
+
+ String reorderedSchema = TRIP_SCHEMA_PREFIX + TIP_NESTED_SCHEMA +
FARE_NESTED_SCHEMA
+ + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
+ assertTrue("Reordered fields are compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
reorderedSchema));
+ assertTrue("Reordered fields are compatible",
+ TableSchemaResolver.isSchemaCompatible(reorderedSchema,
TRIP_EXAMPLE_SCHEMA));
+
+ String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history",
"tip_future");
+ assertFalse("Renamed fields are not compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
renamedSchema));
+
+ assertFalse("Deleted single field is not compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_DEVOLVED));
+ String deletedMultipleFieldSchema = TRIP_SCHEMA_PREFIX +
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+ assertFalse("Deleted multiple fields are not compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
deletedMultipleFieldSchema));
+
+ String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec",
"triprec_renamed");
+ assertFalse("Renamed record name is not compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
renamedRecordSchema));
+
+ String swappedFieldSchema = TRIP_SCHEMA_PREFIX +
MAP_TYPE_SCHEMA.replace("city_to_state", "fare")
+ + FARE_NESTED_SCHEMA.replace("fare", "city_to_state") +
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+ assertFalse("Swapped fields are not compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
swappedFieldSchema));
+
+ String typeChangeSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA +
FARE_NESTED_SCHEMA
+ + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
+ assertFalse("Field type change is not compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
typeChangeSchema));
+
+ assertTrue("Added field with default is compatible (Evolved Schema)",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED));
+
+ String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA +
FARE_NESTED_SCHEMA
+ + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA +
EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
+ + TRIP_SCHEMA_SUFFIX;
+ assertTrue("Multiple added fields with defauls are compatible",
+ TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
multipleAddedFieldSchema));
+ }
+
+ @Test
+ public void testMORTable() throws Exception {
+ tableType = HoodieTableType.MERGE_ON_READ;
+ initMetaClient();
+
+ // Create the table
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(),
+ HoodieTableType.MERGE_ON_READ,
metaClient.getTableConfig().getTableName(),
+ metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_1);
+
+ HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Initial inserts with TRIP_EXAMPLE_SCHEMA
+ int numRecords = 10;
+ insertFirstBatch(hoodieWriteConfig, client, "001", initCommitTime,
+ numRecords, HoodieWriteClient::insert, false, false,
numRecords);
+ checkLatestDeltaCommit("001");
+
+ // Compact once so we can incrementally read later
+ assertTrue(client.scheduleCompactionAtInstant("002", Option.empty()));
+ client.compact("002");
+
+ // Updates with same schema is allowed
+ final int numUpdateRecords = 5;
+ updateBatch(hoodieWriteConfig, client, "003", "002", Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, false, 0, 0, 0);
+ checkLatestDeltaCommit("003");
+ checkReadRecords("000", numRecords);
+
+ // Delete with same schema is allowed
+ final int numDeleteRecords = 2;
+ numRecords -= numDeleteRecords;
+ deleteBatch(hoodieWriteConfig, client, "004", "003", initCommitTime,
numDeleteRecords,
+ HoodieWriteClient::delete, false, false, 0, 0);
+ checkLatestDeltaCommit("004");
+ checkReadRecords("000", numRecords);
+
+ // Insert with evolved schema is not allowed
+ HoodieWriteConfig hoodieDevolvedWriteConfig =
getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+ client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false);
+ final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004",
numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+ try {
+ // We cannot use insertBatch directly here because we want to insert
records
+ // with a devolved schema and insertBatch inserts records using the
TRIP_EXMPLE_SCHEMA.
+ writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
+ (String s, Integer a) -> failedRecords, HoodieWriteClient::insert,
false, 0, 0, 0);
+ fail("Insert with devolved scheme should fail");
+ } catch (HoodieInsertException ex) {
+ // no new commit
+ checkLatestDeltaCommit("004");
+ checkReadRecords("000", numRecords);
+ client.rollback("005");
+ }
+
+ // Update with devolved schema is also not allowed
+ try {
+ updateBatch(hoodieDevolvedWriteConfig, client, "005", "004",
Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, false, 0, 0, 0);
+ fail("Update with devolved scheme should fail");
+ } catch (HoodieUpsertException ex) {
+ // no new commit
+ checkLatestDeltaCommit("004");
+ checkReadRecords("000", numRecords);
+ client.rollback("005");
+ }
+
+ // Insert with an evolved scheme is allowed
+ HoodieWriteConfig hoodieEvolvedWriteConfig =
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED);
+ client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false);
+
+ // We cannot use insertBatch directly here because we want to insert
records
+ // with a evolved schemaand insertBatch inserts records using the
TRIP_EXMPLE_SCHEMA.
+ final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("005",
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+ writeBatch(client, "005", "004", Option.empty(), initCommitTime,
numRecords,
+ (String s, Integer a) -> evolvedRecords, HoodieWriteClient::insert,
false, 0, 0, 0);
+
+ // new commit
+ checkLatestDeltaCommit("005");
+ checkReadRecords("000", 2 * numRecords);
+
+ // Updates with evolved schema is allowed
+ final List<HoodieRecord> updateRecords = generateUpdatesWithSchema("006",
numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+ writeBatch(client, "006", "005", Option.empty(), initCommitTime,
+ numUpdateRecords, (String s, Integer a) -> updateRecords,
HoodieWriteClient::upsert, false, 0, 0, 0);
+ // new commit
+ checkLatestDeltaCommit("006");
+ checkReadRecords("000", 2 * numRecords);
+
+ // Now even the original schema cannot be used for updates as it is
devolved in relation to the
+ // current schema of the dataset.
+ client = getHoodieWriteClient(hoodieWriteConfig, false);
+ try {
+ updateBatch(hoodieWriteConfig, client, "007", "006", Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, false, 0, 0, 0);
+ fail("Update with original scheme should fail");
+ } catch (HoodieUpsertException ex) {
+ // no new commit
+ checkLatestDeltaCommit("006");
+ checkReadRecords("000", 2 * numRecords);
+ client.rollback("007");
+ }
+
+ // Now even the original schema cannot be used for inserts as it is
devolved in relation to the
+ // current schema of the dataset.
+ try {
+ // We are not using insertBatch directly here because insertion of these
+ // records will fail and we dont want to keep these records within
HoodieTestDataGenerator as we
+ // will be testing updates later.
+ failedRecords.clear();
+ failedRecords.addAll(dataGen.generateInserts("007", numRecords));
+ writeBatch(client, "007", "006", Option.empty(), initCommitTime,
numRecords,
+ (String s, Integer a) -> failedRecords, HoodieWriteClient::insert,
true, numRecords, numRecords, 1);
+ fail("Insert with original scheme should fail");
+ } catch (HoodieInsertException ex) {
+ // no new commit
+ checkLatestDeltaCommit("006");
+ checkReadRecords("000", 2 * numRecords);
+ client.rollback("007");
+
+ // Remove the inserts from the in-memory state of HoodieTestDataGenerator
+ // as these records were never inserted in the dataset. This is required
so
+ // that future calls to updateBatch or deleteBatch do not generate
updates
+ // or deletes for records which do not even exist.
+ for (HoodieRecord record : failedRecords) {
+ assertTrue(dataGen.deleteExistingKeyIfPresent(record.getKey()));
+ }
+ }
+
+ // Rollback to the original schema
+ client.restoreToInstant("004");
+ checkLatestDeltaCommit("004");
+
+ // Updates with original schema are now allowed
+ client = getHoodieWriteClient(hoodieWriteConfig, false);
+ updateBatch(hoodieWriteConfig, client, "008", "004", Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, false, 0, 0, 0);
+ // new commit
+ checkLatestDeltaCommit("008");
+ checkReadRecords("000", 2 * numRecords);
+
+ // Insert with original schema is allowed now
+ insertBatch(hoodieWriteConfig, client, "009", "008", numRecords,
HoodieWriteClient::insert,
+ false, false, 0, 0, 0);
+ checkLatestDeltaCommit("009");
+ checkReadRecords("000", 3 * numRecords);
+ }
+
+ @Test
+ public void testCopyOnWriteTable() throws Exception {
+ // Create the table
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(),
+ HoodieTableType.COPY_ON_WRITE,
metaClient.getTableConfig().getTableName(),
+ metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_1);
+
+ HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Initial inserts with TRIP_EXAMPLE_SCHEMA
+ int numRecords = 10;
+ insertFirstBatch(hoodieWriteConfig, client, "001", initCommitTime,
+ numRecords, HoodieWriteClient::insert, false, true,
numRecords);
+ checkReadRecords("000", numRecords);
+
+ // Updates with same schema is allowed
+ final int numUpdateRecords = 5;
+ updateBatch(hoodieWriteConfig, client, "002", "001", Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, true,
+ numUpdateRecords, numRecords, 2);
+ checkReadRecords("000", numRecords);
+
+ // Delete with same schema is allowed
+ final int numDeleteRecords = 2;
+ numRecords -= numDeleteRecords;
+ deleteBatch(hoodieWriteConfig, client, "003", "002", initCommitTime,
numDeleteRecords,
+ HoodieWriteClient::delete, false, true, 0, numRecords);
+ checkReadRecords("000", numRecords);
+
+ // Insert with devolved schema is not allowed
+ HoodieWriteConfig hoodieDevolvedWriteConfig =
getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+ client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false);
+ final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004",
numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+ try {
+ // We cannot use insertBatch directly here because we want to insert
records
+ // with a devolved schema.
+ writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
+ (String s, Integer a) -> failedRecords, HoodieWriteClient::insert,
true, numRecords, numRecords, 1);
+ fail("Insert with devolved scheme should fail");
+ } catch (HoodieInsertException ex) {
+ // no new commit
+ HoodieTimeline curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
+ client.rollback("004");
+ }
+
+ // Update with devolved schema is not allowed
+ try {
+ updateBatch(hoodieDevolvedWriteConfig, client, "004", "003",
Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, true,
+ numUpdateRecords, 2 * numRecords, 5);
+ fail("Update with devolved scheme should fail");
+ } catch (HoodieUpsertException ex) {
+ // no new commit
+ HoodieTimeline curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
+ client.rollback("004");
+ }
+
+ // Insert with evolved scheme is allowed
+ HoodieWriteConfig hoodieEvolvedWriteConfig =
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED);
+ client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false);
+ final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("004",
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+ // We cannot use insertBatch directly here because we want to insert
records
+ // with a evolved schema.
+ writeBatch(client, "004", "003", Option.empty(), initCommitTime,
numRecords,
+ (String s, Integer a) -> evolvedRecords, HoodieWriteClient::insert,
true, numRecords, 2 * numRecords, 4);
+ // new commit
+ HoodieTimeline curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("004"));
+ checkReadRecords("000", 2 * numRecords);
+
+ // Updates with evolved schema is allowed
+ final List<HoodieRecord> updateRecords = generateUpdatesWithSchema("005",
numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+ writeBatch(client, "005", "004", Option.empty(), initCommitTime,
+ numUpdateRecords, (String s, Integer a) -> updateRecords,
HoodieWriteClient::upsert, true, numUpdateRecords, 2 * numRecords, 5);
+ checkReadRecords("000", 2 * numRecords);
+
+ // Now even the original schema cannot be used for updates as it is
devolved
+ // in relation to the current schema of the dataset.
+ client = getHoodieWriteClient(hoodieWriteConfig, false);
+ try {
+ updateBatch(hoodieWriteConfig, client, "006", "005", Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert,
false, true,
+ numUpdateRecords, numRecords, 2);
+ fail("Update with original scheme should fail");
+ } catch (HoodieUpsertException ex) {
+ // no new commit
+ curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("005"));
+ client.rollback("006");
+ }
+
+ // Now even the original schema cannot be used for inserts as it is
devolved
+ // in relation to the current schema of the dataset.
+ try {
+ // We are not using insertBatch directly here because insertion of these
+ // records will fail and we dont want to keep these records within
+ // HoodieTestDataGenerator.
+ failedRecords.clear();
+ failedRecords.addAll(dataGen.generateInserts("006", numRecords));
+ writeBatch(client, "006", "005", Option.empty(), initCommitTime,
numRecords,
+ (String s, Integer a) -> failedRecords, HoodieWriteClient::insert,
true, numRecords, numRecords, 1);
+ fail("Insert with original scheme should fail");
+ } catch (HoodieInsertException ex) {
+ // no new commit
+ curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("005"));
+ client.rollback("006");
+
+ // Remove the inserts from the in-memory state of HoodieTestDataGenerator
+ // as these records were never inserted in the dataset. This is required
so
+ // that future calls to updateBatch or deleteBatch do not generate
updates
+ // or deletes for records which do not even exist.
+ for (HoodieRecord record : failedRecords) {
+ assertTrue(dataGen.deleteExistingKeyIfPresent(record.getKey()));
+ }
+ }
+
+ // Revert to the older commit and ensure that the original schema can now
+ // be used for inserts and inserts.
+ client.restoreToInstant("003");
+ curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
+ checkReadRecords("000", numRecords);
+
+ // Insert with original schema is allowed now
+ insertBatch(hoodieWriteConfig, client, "007", "003", numRecords,
HoodieWriteClient::insert,
+ false, true, numRecords, 2 * numRecords, 1);
+ checkReadRecords("000", 2 * numRecords);
+
+ // Update with original schema is allowed now
+ updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
+ initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false,
true,
+ numUpdateRecords, 2 * numRecords, 5);
+ checkReadRecords("000", 2 * numRecords);
+ }
+
+ private void checkReadRecords(String instantTime, int numExpectedRecords)
throws IOException {
+ if (tableType == HoodieTableType.COPY_ON_WRITE) {
+ HoodieTimeline timeline =
metaClient.reloadActiveTimeline().getCommitTimeline();
+ assertEquals(numExpectedRecords,
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline,
instantTime).count());
+ } else {
+ // TODO: This code fails to read records under the following conditions:
+ // 1. No parquet files yet (i.e. no compaction done yet)
+ // 2. Log file but no base file with the same FileID
+ /*
+ FileStatus[] allFiles =
HoodieTestUtils.listAllDataAndLogFilesInPath(metaClient.getFs(), basePath);
+ HoodieTimeline timeline =
metaClient.reloadActiveTimeline().getCommitsTimeline();
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, timeline, allFiles);
+ List<String> dataFiles = fsView.getLatestBaseFiles().map(hf ->
hf.getPath()).collect(Collectors.toList());
+
+ Configuration conf = new Configuration();
+ String absTableName = "hoodie." +
metaClient.getTableConfig().getTableName();
+ conf.set(absTableName + ".consume.mode", "INCREMENTAL");
+ conf.set(absTableName + ".consume.start.timestamp", instantTime);
+ conf.set(absTableName + ".consume.max.commits", "-1");
+ List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath,
conf);
+ assertEquals(recordsRead.size(), numExpectedRecords);
+ */
+ }
+ }
+
+ private void checkLatestDeltaCommit(String instantTime) {
+ HoodieTimeline timeline =
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
assertTrue(timeline.lastInstant().get().getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+
assertTrue(timeline.lastInstant().get().getTimestamp().equals(instantTime));
+ }
+
+ private List<HoodieRecord> generateInsertsWithSchema(String commitTime, int
numRecords, String schemaStr) {
+ HoodieTestDataGenerator gen =
schemaStr.equals(TRIP_EXAMPLE_SCHEMA_EVOLVED) ? dataGenEvolved :
dataGenDevolved;
+ List<HoodieRecord> records = gen.generateInserts(commitTime, numRecords);
+ return convertToSchema(records, schemaStr);
+ }
+
+ private List<HoodieRecord> generateUpdatesWithSchema(String commitTime, int
numRecords, String schemaStr) {
+ HoodieTestDataGenerator gen =
schemaStr.equals(TRIP_EXAMPLE_SCHEMA_EVOLVED) ? dataGenEvolved :
dataGenDevolved;
+ List<HoodieRecord> records = gen.generateUniqueUpdates(commitTime,
numRecords);
+ return convertToSchema(records, schemaStr);
+ }
+
+ private List<HoodieRecord> convertToSchema(List<HoodieRecord> records,
String schemaStr) {
+ Schema newSchema = new Schema.Parser().parse(schemaStr);
+ return records.stream().map(r -> {
+ HoodieKey key = r.getKey();
+ GenericRecord payload;
+ try {
+ payload =
(GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+ GenericRecord newPayload =
HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema);
+ return new HoodieRecord(key, new
TestRawTripPayload(newPayload.toString(), key.getRecordKey(),
key.getPartitionPath(), schemaStr));
+ } catch (IOException e) {
+ throw new RuntimeException("Conversion to new schema failed");
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private HoodieWriteConfig getWriteConfig(String schema) {
+ return getConfigBuilder(schema)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
+ .withAvroSchemaValidate(true)
+ .build();
+ }
+
+ protected HoodieTableType getTableType() {
+ return tableType;
+ }
+}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 8e9036a..65b567c 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -662,6 +662,23 @@ public class HoodieTestDataGenerator {
return result.stream();
}
+ public boolean deleteExistingKeyIfPresent(HoodieKey key) {
+ Map<Integer, KeyPartition> existingKeys =
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+ Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+ for (Map.Entry<Integer, KeyPartition> entry: existingKeys.entrySet()) {
+ if (entry.getValue().key.equals(key)) {
+ int index = entry.getKey();
+ existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+ existingKeys.remove(numExistingKeys - 1);
+ numExistingKeys--;
+ numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public String[] getPartitionPaths() {
return partitionPaths;
}
@@ -679,4 +696,4 @@ public class HoodieTestDataGenerator {
public void close() {
existingKeysBySchema.clear();
}
-}
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index af44a31..20b49e2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -97,6 +97,14 @@ public class HoodieAvroUtils {
|| HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
}
+ public static Schema createHoodieWriteSchema(Schema originalSchema) {
+ return HoodieAvroUtils.addMetadataFields(originalSchema);
+ }
+
+ public static Schema createHoodieWriteSchema(String originalSchema) {
+ return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema));
+ }
+
/**
* Adds the Hoodie metadata fields to the given schema.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
new file mode 100644
index 0000000..0bab862
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Helper class to read schema from data files and log files and to convert it
between different formats.
+ */
+public class TableSchemaResolver {
+
+ private static final Logger LOG =
LogManager.getLogger(TableSchemaResolver.class);
+ private HoodieTableMetaClient metaClient;
+
+ public TableSchemaResolver(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ }
+
+ /**
+ * Gets the schema for a hoodie table. Depending on the type of table, read
from any file written in the latest
+ * commit. We will assume that the schema has not changed within a single
atomic write.
+ *
+ * @return Parquet schema for this table
+ * @throws Exception
+ */
+ public MessageType getDataSchema() throws Exception {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+ try {
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE:
+ // If this is COW, get the last commit and read the schema from a
file written in the
+ // last commit
+ HoodieInstant lastCommit =
+
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(()
-> new InvalidTableException(metaClient.getBasePath()));
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(),
HoodieCommitMetadata.class);
+ String filePath =
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
+ .orElseThrow(() -> new IllegalArgumentException("Could not find
any data file written for commit "
+ + lastCommit + ", could not get schema for table " +
metaClient.getBasePath() + ", Metadata :"
+ + commitMetadata));
+ return readSchemaFromBaseFile(new Path(filePath));
+ case MERGE_ON_READ:
+ // If this is MOR, depending on whether the latest commit is a delta
commit or
+ // compaction commit
+ // Get a datafile written and get the schema from that file
+ Option<HoodieInstant> lastCompactionCommit =
+
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+ LOG.info("Found the last compaction commit as " +
lastCompactionCommit);
+
+ Option<HoodieInstant> lastDeltaCommit;
+ if (lastCompactionCommit.isPresent()) {
+ lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+ .findInstantsAfter(lastCompactionCommit.get().getTimestamp(),
Integer.MAX_VALUE).lastInstant();
+ } else {
+ lastDeltaCommit =
+
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
+ }
+ LOG.info("Found the last delta commit " + lastDeltaCommit);
+
+ if (lastDeltaCommit.isPresent()) {
+ HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
+ // read from the log file wrote
+ commitMetadata =
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
+ HoodieCommitMetadata.class);
+ Pair<String, HoodieFileFormat> filePathWithFormat =
+
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
+ .filter(s ->
s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
+ .map(f -> Pair.of(f,
HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
+ // No Log files in Delta-Commit. Check if there are any
parquet files
+ return
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
+ .filter(s ->
s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
+ .findAny().map(f -> Pair.of(f,
HoodieFileFormat.PARQUET)).orElseThrow(() ->
+ new IllegalArgumentException("Could not find any
data file written for commit "
+ + lastDeltaInstant + ", could not get schema for
table " + metaClient.getBasePath()
+ + ", CommitMetadata :" + commitMetadata));
+ });
+ switch (filePathWithFormat.getRight()) {
+ case HOODIE_LOG:
+ return readSchemaFromLogFile(lastCompactionCommit, new
Path(filePathWithFormat.getLeft()));
+ case PARQUET:
+ return readSchemaFromBaseFile(new
Path(filePathWithFormat.getLeft()));
+ default:
+ throw new IllegalArgumentException("Unknown file format :" +
filePathWithFormat.getRight()
+ + " for file " + filePathWithFormat.getLeft());
+ }
+ } else {
+ return readSchemaFromLastCompaction(lastCompactionCommit);
+ }
+ default:
+ LOG.error("Unknown table type " + metaClient.getTableType());
+ throw new InvalidTableException(metaClient.getBasePath());
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to read data schema", e);
+ }
+ }
+
+ /**
+ * Gets the schema for a hoodie table in Avro format.
+ *
+ * @return Avro schema for this table
+ * @throws Exception
+ */
+ public Schema getTableSchema() throws Exception {
+ return convertParquetSchemaToAvro(getDataSchema());
+ }
+
+ /**
+ * Gets the schema for a hoodie table in Avro format from the
HoodieCommitMetadata of the last commit.
+ *
+ * @return Avro schema for this table
+ * @throws Exception
+ */
+ public Schema getTableSchemaFromCommitMetadata() throws Exception {
+ try {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ byte[] data =
timeline.getInstantDetails(timeline.lastInstant().get()).get();
+ HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data,
HoodieCommitMetadata.class);
+ String existingSchemaStr =
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
+ return new Schema.Parser().parse(existingSchemaStr);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to read schema from commit metadata",
e);
+ }
+ }
+
+ /**
+ * Convert a parquet scheme to the avro format.
+ *
+ * @param parquetSchema The parquet schema to convert
+ * @return The converted avro schema
+ */
+ public Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
+ AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(metaClient.getHadoopConf());
+ return avroSchemaConverter.convert(parquetSchema);
+ }
+
+ /**
+ * HUDI specific validation of schema evolution. Ensures that a newer schema
can be used for the dataset by
+ * checking if the data written using the old schema can be read using the
new schema.
+ *
+ * HUDI requires a Schema to be specified in HoodieWriteConfig and is used
by the HoodieWriteClient to
+ * create the records. The schema is also saved in the data files (parquet
format) and log files (avro format).
+ * Since a schema is required each time new data is ingested into a HUDI
dataset, schema can be evolved over time.
+ *
+ * New Schema is compatible only if:
+ * A1. There is no change in schema
+ * A2. A field has been added and it has a default value specified
+ *
+ * New Schema is incompatible if:
+ * B1. A field has been deleted
+ * B2. A field has been renamed (treated as delete + add)
+ * B3. A field's type has changed to be incompatible with the older type
+ *
+ * Issue with org.apache.avro.SchemaCompatibility:
+ * org.apache.avro.SchemaCompatibility checks schema compatibility between
a writer schema (which originally wrote
+ * the AVRO record) and a readerSchema (with which we are reading the
record). It ONLY guarantees that that each
+ * field in the reader record can be populated from the writer record.
Hence, if the reader schema is missing a
+ * field, it is still compatible with the writer schema.
+ *
+ * In other words, org.apache.avro.SchemaCompatibility was written to
guarantee that we can read the data written
+ * earlier. It does not guarantee schema evolution for HUDI (B1 above).
+ *
+ * Implementation: This function implements specific HUDI specific checks
(listed below) and defers the remaining
+ * checks to the org.apache.avro.SchemaCompatibility code.
+ *
+ * Checks:
+ * C1. If there is no change in schema: success
+ * C2. If a field has been deleted in new schema: failure
+ * C3. If a field has been added in new schema: it should have default value
specified
+ * C4. If a field has been renamed(treated as delete + add): failure
+ * C5. If a field type has changed: failure
+ *
+ * @param oldSchema Older schema to check.
+ * @param newSchema Newer schema to check.
+ * @return True if the schema validation is successful
+ */
+ public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema)
{
+ if (oldSchema.getType() == newSchema.getType() && newSchema.getType() ==
Schema.Type.RECORD) {
+ // record names must match:
+ if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) {
+ return false;
+ }
+
+ // Check that each field in the oldSchema can populated the newSchema
+ for (final Field oldSchemaField : oldSchema.getFields()) {
+ final Field newSchemaField =
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
+ if (newSchemaField == null) {
+ // C4 or C2: newSchema does not correspond to any field in the
oldSchema
+ return false;
+ } else {
+ if (!isSchemaCompatible(oldSchemaField.schema(),
newSchemaField.schema())) {
+ // C5: The fields do not have a compatible type
+ return false;
+ }
+ }
+ }
+
+ // Check that new fields added in newSchema have default values as they
will not be
+ // present in oldSchema and hence cannot be populated on reading records
from existing data.
+ for (final Field newSchemaField : newSchema.getFields()) {
+ final Field oldSchemaField =
SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
+ if (oldSchemaField == null) {
+ if (newSchemaField.defaultValue() == null) {
+ // C3: newly added field in newSchema does not have a default value
+ return false;
+ }
+ }
+ }
+
+ // All fields in the newSchema record can be populated from the
oldSchema record
+ return true;
+ } else {
+ // Use the checks implemented by
+ org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult
=
+
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema,
newSchema);
+ return compatResult.getType() ==
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+ }
+ }
+
+ public static boolean isSchemaCompatible(String oldSchema, String newSchema)
{
+ return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new
Schema.Parser().parse(newSchema));
+ }
+
+ /**
+ * Read the parquet schema from a parquet File.
+ */
+ public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws
IOException {
+ LOG.info("Reading schema from " + parquetFilePath);
+
+ FileSystem fs = metaClient.getRawFs();
+ if (!fs.exists(parquetFilePath)) {
+ throw new IllegalArgumentException(
+ "Failed to read schema from data file " + parquetFilePath + ". File
does not exist.");
+ }
+ ParquetMetadata fileFooter =
+ ParquetFileReader.readFooter(fs.getConf(), parquetFilePath,
ParquetMetadataConverter.NO_FILTER);
+ return fileFooter.getFileMetaData().getSchema();
+ }
+
+ /**
+ * Read schema from a data file from the last compaction commit done.
+ * @throws Exception
+ */
+ public MessageType readSchemaFromLastCompaction(Option<HoodieInstant>
lastCompactionCommitOpt) throws Exception {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+ HoodieInstant lastCompactionCommit =
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
+ "Could not read schema from last compaction, no compaction commits
found on path " + metaClient));
+
+ // Read from the compacted file wrote
+ HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
+
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(),
HoodieCommitMetadata.class);
+ String filePath =
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
+ .orElseThrow(() -> new IllegalArgumentException("Could not find any
data file written for compaction "
+ + lastCompactionCommit + ", could not get schema for table " +
metaClient.getBasePath()));
+ return readSchemaFromBaseFile(new Path(filePath));
+ }
+
+ /**
+ * Read the schema from the log file on path.
+ *
+ * @return
+ */
+ public MessageType readSchemaFromLogFile(Path path) throws IOException {
+ FileSystem fs = metaClient.getRawFs();
+ Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path),
null);
+ HoodieAvroDataBlock lastBlock = null;
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ if (block instanceof HoodieAvroDataBlock) {
+ lastBlock = (HoodieAvroDataBlock) block;
+ }
+ }
+ reader.close();
+ if (lastBlock != null) {
+ return new AvroSchemaConverter().convert(lastBlock.getSchema());
+ }
+ return null;
+ }
+
+ /**
+ * Read the schema from the log file on path.
+ * @throws Exception
+ */
+ public MessageType readSchemaFromLogFile(Option<HoodieInstant>
lastCompactionCommitOpt, Path path)
+ throws Exception {
+ MessageType messageType = readSchemaFromLogFile(path);
+ // Fall back to read the schema from last compaction
+ if (messageType == null) {
+ LOG.info("Falling back to read the schema from last compaction " +
lastCompactionCommitOpt);
+ return readSchemaFromLastCompaction(lastCompactionCommitOpt);
+ }
+ return messageType;
+ }
+
+ /**
+ * Read the schema from the log file on path.
+ *
+ * @return
+ */
+ public static MessageType readSchemaFromLogFile(FileSystem fs, Path path)
throws IOException {
+ Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path),
null);
+ HoodieAvroDataBlock lastBlock = null;
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ if (block instanceof HoodieAvroDataBlock) {
+ lastBlock = (HoodieAvroDataBlock) block;
+ }
+ }
+ reader.close();
+ if (lastBlock != null) {
+ return new AvroSchemaConverter().convert(lastBlock.getSchema());
+ }
+ return null;
+ }
+}
diff --git
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index e6572c9..273635c 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -25,7 +25,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.hive.util.SchemaUtil;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
@@ -158,7 +158,7 @@ public class HiveSyncTool {
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema =
hoodieHiveClient.getTableSchema(tableName);
- SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema,
tableSchema, cfg.partitionFields);
+ SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema,
tableSchema, cfg.partitionFields);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);
diff --git
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 55a4968..9f1a040 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -21,18 +21,14 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.hive.util.SchemaUtil;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -49,9 +45,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.jdbc.HiveDriver;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;
@@ -74,7 +67,7 @@ public class HoodieHiveClient {
private static final String HOODIE_LAST_COMMIT_TIME_SYNC =
"last_commit_time_sync";
// Make sure we have the hive JDBC driver in classpath
private static String driverName = HiveDriver.class.getName();
- private static final String HIVE_ESCAPE_CHARACTER =
SchemaUtil.HIVE_ESCAPE_CHARACTER;
+ private static final String HIVE_ESCAPE_CHARACTER =
HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
static {
try {
@@ -250,7 +243,7 @@ public class HoodieHiveClient {
void updateTableDefinition(String tableName, MessageType newSchema) {
try {
- String newSchemaStr = SchemaUtil.generateSchemaString(newSchema,
syncConfig.partitionFields);
+ String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema,
syncConfig.partitionFields);
// Cascade clause should not be present for non-partitioned tables
String cascadeClause = syncConfig.partitionFields.size() > 0 ? "
cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE
").append(HIVE_ESCAPE_CHARACTER)
@@ -268,7 +261,7 @@ public class HoodieHiveClient {
void createTable(String tableName, MessageType storageSchema, String
inputFormatClass, String outputFormatClass, String serdeClass) {
try {
String createSQLQuery =
- SchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig,
inputFormatClass, outputFormatClass, serdeClass);
+ HiveSchemaUtil.generateCreateDDL(tableName, storageSchema,
syncConfig, inputFormatClass, outputFormatClass, serdeClass);
LOG.info("Creating table with " + createSQLQuery);
updateHiveSQL(createSQLQuery);
} catch (IOException e) {
@@ -340,123 +333,15 @@ public class HoodieHiveClient {
*
* @return Parquet schema for this table
*/
- @SuppressWarnings("WeakerAccess")
public MessageType getDataSchema() {
try {
- switch (tableType) {
- case COPY_ON_WRITE:
- // If this is COW, get the last commit and read the schema from a
file written in the
- // last commit
- HoodieInstant lastCommit =
- activeTimeline.lastInstant().orElseThrow(() -> new
InvalidTableException(syncConfig.basePath));
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
- .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(),
HoodieCommitMetadata.class);
- String filePath =
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
- .orElseThrow(() -> new IllegalArgumentException("Could not find
any data file written for commit "
- + lastCommit + ", could not get schema for table " +
metaClient.getBasePath() + ", Metadata :"
- + commitMetadata));
- return readSchemaFromBaseFile(new Path(filePath));
- case MERGE_ON_READ:
- // If this is MOR, depending on whether the latest commit is a delta
commit or
- // compaction commit
- // Get a datafile written and get the schema from that file
- Option<HoodieInstant> lastCompactionCommit =
-
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
- LOG.info("Found the last compaction commit as " +
lastCompactionCommit);
-
- Option<HoodieInstant> lastDeltaCommit;
- if (lastCompactionCommit.isPresent()) {
- lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
- .findInstantsAfter(lastCompactionCommit.get().getTimestamp(),
Integer.MAX_VALUE).lastInstant();
- } else {
- lastDeltaCommit =
-
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
- }
- LOG.info("Found the last delta commit " + lastDeltaCommit);
-
- if (lastDeltaCommit.isPresent()) {
- HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
- // read from the log file wrote
- commitMetadata =
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
- HoodieCommitMetadata.class);
- Pair<String, HoodieFileFormat> filePathWithFormat =
-
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
- .filter(s ->
s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
- .map(f -> Pair.of(f,
HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
- // No Log files in Delta-Commit. Check if there are any
parquet files
- return
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
- .filter(s ->
s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
- .findAny().map(f -> Pair.of(f,
HoodieFileFormat.PARQUET)).orElseThrow(() ->
- new IllegalArgumentException("Could not find any
data file written for commit "
- + lastDeltaInstant + ", could not get schema for
table " + metaClient.getBasePath()
- + ", CommitMetadata :" + commitMetadata));
- });
- switch (filePathWithFormat.getRight()) {
- case HOODIE_LOG:
- return readSchemaFromLogFile(lastCompactionCommit, new
Path(filePathWithFormat.getLeft()));
- case PARQUET:
- return readSchemaFromBaseFile(new
Path(filePathWithFormat.getLeft()));
- default:
- throw new IllegalArgumentException("Unknown file format :" +
filePathWithFormat.getRight()
- + " for file " + filePathWithFormat.getLeft());
- }
- } else {
- return readSchemaFromLastCompaction(lastCompactionCommit);
- }
- default:
- LOG.error("Unknown table type " + tableType);
- throw new InvalidTableException(syncConfig.basePath);
- }
- } catch (IOException e) {
+ return new TableSchemaResolver(metaClient).getDataSchema();
+ } catch (Exception e) {
throw new HoodieHiveSyncException("Failed to read data schema", e);
}
}
/**
- * Read schema from a data file from the last compaction commit done.
- */
- private MessageType readSchemaFromLastCompaction(Option<HoodieInstant>
lastCompactionCommitOpt) throws IOException {
- HoodieInstant lastCompactionCommit =
lastCompactionCommitOpt.orElseThrow(() -> new HoodieHiveSyncException(
- "Could not read schema from last compaction, no compaction commits
found on path " + syncConfig.basePath));
-
- // Read from the compacted file wrote
- HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
-
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(),
HoodieCommitMetadata.class);
- String filePath =
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
- .orElseThrow(() -> new IllegalArgumentException("Could not find any
data file written for compaction "
- + lastCompactionCommit + ", could not get schema for table " +
metaClient.getBasePath()));
- return readSchemaFromBaseFile(new Path(filePath));
- }
-
- /**
- * Read the schema from the log file on path.
- */
- private MessageType readSchemaFromLogFile(Option<HoodieInstant>
lastCompactionCommitOpt, Path path)
- throws IOException {
- MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
- // Fall back to read the schema from last compaction
- if (messageType == null) {
- LOG.info("Falling back to read the schema from last compaction " +
lastCompactionCommitOpt);
- return readSchemaFromLastCompaction(lastCompactionCommitOpt);
- }
- return messageType;
- }
-
- /**
- * Read the parquet schema from a parquet File.
- */
- private MessageType readSchemaFromBaseFile(Path parquetFilePath) throws
IOException {
- LOG.info("Reading schema from " + parquetFilePath);
- if (!fs.exists(parquetFilePath)) {
- throw new IllegalArgumentException(
- "Failed to read schema from data file " + parquetFilePath + ". File
does not exist.");
- }
- ParquetMetadata fileFooter =
- ParquetFileReader.readFooter(fs.getConf(), parquetFilePath,
ParquetMetadataConverter.NO_FILTER);
- return fileFooter.getFileMetaData().getSchema();
- }
-
- /**
* @return true if the configured table exists
*/
public boolean doesTableExist(String tableName) {
diff --git
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
similarity index 93%
rename from
hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
rename to
hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 499f2a7..7fd64bd 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -18,20 +18,12 @@
package org.apache.hudi.hive.util;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
@@ -50,9 +42,9 @@ import java.util.Set;
/**
* Schema Utilities.
*/
-public class SchemaUtil {
+public class HiveSchemaUtil {
- private static final Logger LOG = LogManager.getLogger(SchemaUtil.class);
+ private static final Logger LOG = LogManager.getLogger(HiveSchemaUtil.class);
public static final String HIVE_ESCAPE_CHARACTER = "`";
/**
@@ -424,25 +416,4 @@ public class SchemaUtil {
// Dont do that
return "String";
}
-
- /**
- * Read the schema from the log file on path.
- *
- * @return
- */
- public static MessageType readSchemaFromLogFile(FileSystem fs, Path path)
throws IOException {
- Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path),
null);
- HoodieAvroDataBlock lastBlock = null;
- while (reader.hasNext()) {
- HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- lastBlock = (HoodieAvroDataBlock) block;
- }
- }
- reader.close();
- if (lastBlock != null) {
- return new AvroSchemaConverter().convert(lastBlock.getSchema());
- }
- return null;
- }
}
diff --git
a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index d92c9d9..14dfada 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.hive.util.SchemaUtil;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.parquet.schema.MessageType;
@@ -79,7 +79,7 @@ public class TestHiveSyncTool {
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
.named("ArrayOfInts");
- String schemaString = SchemaUtil.generateSchemaString(schema);
+ String schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A array of arrays
@@ -87,14 +87,14 @@ public class TestHiveSyncTool {
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
// A list of integers
schema =
Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
.named("element").named("int_list").named("ArrayOfInts");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A list of structs with two fields
@@ -102,7 +102,7 @@ public class TestHiveSyncTool {
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>",
schemaString);
// A list of structs with a single field
@@ -112,7 +112,7 @@ public class TestHiveSyncTool {
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
.named("ArrayOfOneTuples");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>",
schemaString);
// A list of structs with a single field
@@ -122,7 +122,7 @@ public class TestHiveSyncTool {
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
.named("one_tuple_list").named("ArrayOfOneTuples2");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>",
schemaString);
// A list of structs with a single field
@@ -132,7 +132,7 @@ public class TestHiveSyncTool {
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
.named("ArrayOfOneTuples3");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
// A list of maps
@@ -141,7 +141,7 @@ public class TestHiveSyncTool {
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
- schemaString = SchemaUtil.generateSchemaString(schema);
+ schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
}