This is an automated email from the ASF dual-hosted git repository.
mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c218231bd [HUDI-5244] Fix bugs in schema evolution client with lost
operation field and not found schema (#7248)
4c218231bd is described below
commit 4c218231bdb2a24310145b721cf87ba7b0f1534a
Author: Alexander Trushev <[email protected]>
AuthorDate: Mon Nov 21 10:54:01 2022 +0700
[HUDI-5244] Fix bugs in schema evolution client with lost operation field
and not found schema (#7248)
* [HUDI-5244] Fix bugs in schema evolution client with lost operation field
and not found schema
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 21 +++--
.../action/commit/TestSchemaEvolutionClient.java | 96 ++++++++++++++++++++++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 +
3 files changed, 114 insertions(+), 7 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 4a3f6bd311..133dfce9e9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -280,7 +280,7 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() ||
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
{
InternalSchema internalSchema;
- Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new
Schema.Parser().parse(config.getSchema()));
+ Schema avroSchema =
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(),
config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
@@ -1762,16 +1762,13 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
private Pair<InternalSchema, HoodieTableMetaClient>
getInternalSchemaAndMetaClient() {
HoodieTableMetaClient metaClient = createMetaClient(true);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- Option<InternalSchema> internalSchemaOption =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
- if (!internalSchemaOption.isPresent()) {
- throw new HoodieException(String.format("cannot find schema for current
table: %s", config.getBasePath()));
- }
- return Pair.of(internalSchemaOption.get(), metaClient);
+ return Pair.of(getInternalSchema(schemaUtil), metaClient);
}
private void commitTableChange(InternalSchema newSchema,
HoodieTableMetaClient metaClient) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+ String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(
+ () -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), ""));
Schema schema = AvroInternalSchemaConverter.convert(newSchema,
config.getTableName());
String commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA,
metaClient.getTableType());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1793,4 +1790,14 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
schemasManager.persistHistorySchemaStr(instantTime,
SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta),
commitActionType);
}
+
+ private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
+ return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(()
-> {
+ try {
+ return
AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema());
+ } catch (Exception e) {
+ throw new HoodieException(String.format("cannot find schema for
current table: %s", config.getBasePath()));
+ }
+ });
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java
new file mode 100644
index 0000000000..bda4a3267d
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.testutils.HoodieJavaClientTestBase;
+
+import org.apache.avro.Schema;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for schema evolution client api.
+ */
+public class TestSchemaEvolutionClient extends HoodieJavaClientTestBase {
+
+ private static final Schema SCHEMA =
getSchemaFromResource(TestSchemaEvolutionClient.class, "/exampleSchema.avsc");
+
+ @BeforeEach
+ public void setUpClient() throws IOException {
+ HoodieJavaWriteClient<RawTripTestPayload> writeClient = getWriteClient();
+ this.writeClient = writeClient;
+ prepareTable(writeClient);
+ }
+
+ @AfterEach
+ public void closeClient() {
+ if (writeClient != null) {
+ writeClient.close();
+ }
+ }
+
+ @Test
+ public void testUpdateColumnType() {
+ writeClient.updateColumnType("number", Types.LongType.get());
+ assertEquals(Types.LongType.get(), getFieldByName("number").type());
+ }
+
+ private HoodieJavaWriteClient<RawTripTestPayload> getWriteClient() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withEngineType(EngineType.JAVA)
+ .withPath(basePath)
+ .withSchema(SCHEMA.toString())
+ .build();
+ return new HoodieJavaWriteClient<>(context, config);
+ }
+
+ private void prepareTable(HoodieJavaWriteClient<RawTripTestPayload>
writeClient) throws IOException {
+ String commitTime = "1";
+ writeClient.startCommitWithTime(commitTime);
+ //language=JSON
+ String jsonRow = "{\"_row_key\": \"1\", \"time\":
\"2000-01-01T00:00:00.000Z\", \"number\": 1}";
+ RawTripTestPayload payload = new RawTripTestPayload(jsonRow);
+ HoodieAvroRecord<RawTripTestPayload> record = new HoodieAvroRecord<>(
+ new HoodieKey(payload.getRowKey(), payload.getPartitionPath()),
payload);
+ writeClient.insert(Collections.singletonList(record), commitTime);
+ }
+
+ private Types.Field getFieldByName(String fieldName) {
+ return new TableSchemaResolver(metaClient)
+ .getTableInternalSchemaFromCommitMetadata()
+ .get()
+ .findField(fieldName);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 2f226b2d46..ef2de67347 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -184,6 +184,10 @@ public class HoodieAvroUtils {
return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema));
}
+ public static Schema createHoodieWriteSchema(String originalSchema, boolean
withOperationField) {
+ return addMetadataFields(new Schema.Parser().parse(originalSchema),
withOperationField);
+ }
+
/**
* Adds the Hoodie metadata fields to the given schema.
*