nsivabalan commented on a change in pull request #2666:
URL: https://github.com/apache/hudi/pull/2666#discussion_r605601097
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -79,6 +80,7 @@
public static final String TIMELINE_LAYOUT_VERSION =
"hoodie.timeline.layout.version";
public static final String BASE_PATH_PROP = "hoodie.base.path";
public static final String AVRO_SCHEMA = "hoodie.avro.schema";
+ public static final String LAST_AVRO_SCHEMA = "hoodie.last.avro.schema";
Review comment:
may be we can call this as "latest table schema".
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -106,7 +110,7 @@
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable),
taskContextSupplier);
Review comment:
Can you help me understand something. Here we have HoodieWriteConfig at
two places. As a separate entity (1st arg in constructor) and
HoodieTable.getConfig(). I see within
getWriterSchemaIncludingAndExcludingMetadataPair(...), we update lastSchema in
hoodieWriteConfig, which will update the 1st arg. But table.getConfig() may not
be updated right.
If above statement is right, how do we rely on
table.getConfig.getLastSchema() in *MergeHelper classes.
May be I am missing something. can you throw some light.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -59,14 +61,24 @@ public static HoodieCommitMetadata
buildMetadata(List<HoodieWriteStat> writeStat
Option<Map<String, String>>
extraMetadata,
WriteOperationType
operationType,
String
schemaToStoreInCommit,
- String commitActionType) {
+ String commitActionType,
+ Boolean updatePartialFields,
+ HoodieTableMetaClient
metaClient) {
HoodieCommitMetadata commitMetadata = buildMetadataFromStats(writeStats,
partitionToReplaceFileIds, commitActionType, operationType);
// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(commitMetadata::addMetadata);
}
+ if (updatePartialFields) {
+ try {
Review comment:
if updatePartialFields is set, can't we rely on config.getLastSchema()
in all places where this method is called similar to how we pass in
config.getSchema(). Is it not guaranteed that config.lastSchema() will be set
by the time we reach here if updatePartialFields is set to true?
Trying to see if we can avoid parsing the schema from table once again since
we have already done it once.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
Review comment:
java docs w/ example would be great.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -123,6 +127,22 @@ public HoodieMergeHandle(HoodieWriteConfig config, String
instantTime, HoodieTab
init(fileId, this.partitionPath, dataFileToBeMerged);
}
+ protected static Pair<Schema, Schema>
getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config,
HoodieTable hoodieTable) {
+ Schema originalSchema = new Schema.Parser().parse(config.getSchema());
+ Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
+ boolean updatePartialFields = config.updatePartialFields();
+ if (updatePartialFields) {
+ try {
+ TableSchemaResolver resolver = new
TableSchemaResolver(hoodieTable.getMetaClient());
+ Schema lastSchema = resolver.getTableAvroSchema();
+ config.setLastSchema(lastSchema.toString());
Review comment:
infact, would prefer to throw an exception if table schema is not
present and if someone is trying to use updatePartialFields.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+ public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialUpdatePayload(Option<GenericRecord> record) {
+ this(record.get(), (record1) -> 0); // natural order
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
lastValue, Schema schema) throws IOException {
+ Option<IndexedRecord> recordOption = getInsertValue(schema);
+ if (recordOption.isPresent()) {
+ IndexedRecord record = recordOption.get();
+ GenericRecord current = (GenericRecord) record;
+
+ List<Schema.Field> fieldList = schema.getFields();
+ GenericRecord last = (GenericRecord) lastValue;
+ for (Schema.Field field : fieldList) {
+ last.put(field.name(), current.get(field.name()));
+ }
+ return Option.ofNullable(last);
+ }
+ return recordOption;
+ }
+}
Review comment:
line break please.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+ public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialUpdatePayload(Option<GenericRecord> record) {
+ this(record.get(), (record1) -> 0); // natural order
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
lastValue, Schema schema) throws IOException {
Review comment:
schema here refers to incoming partial schema right?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -123,6 +127,22 @@ public HoodieMergeHandle(HoodieWriteConfig config, String
instantTime, HoodieTab
init(fileId, this.partitionPath, dataFileToBeMerged);
}
+ protected static Pair<Schema, Schema>
getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config,
HoodieTable hoodieTable) {
+ Schema originalSchema = new Schema.Parser().parse(config.getSchema());
+ Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
+ boolean updatePartialFields = config.updatePartialFields();
+ if (updatePartialFields) {
+ try {
+ TableSchemaResolver resolver = new
TableSchemaResolver(hoodieTable.getMetaClient());
+ Schema lastSchema = resolver.getTableAvroSchema();
+ config.setLastSchema(lastSchema.toString());
Review comment:
lastSchema could be null if this is first commit or last commit is an
operation which does not inject schema in commit metadata. Can we check if not
null and then set the last schema. If not, .toString() could throw
NullPointerException.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+ public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialUpdatePayload(Option<GenericRecord> record) {
+ this(record.get(), (record1) -> 0); // natural order
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
lastValue, Schema schema) throws IOException {
Review comment:
nit: instead of "lastValue", may be "existingValue" or
"existingStorageValue"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]