xushiyan commented on code in PR #5522:
URL: https://github.com/apache/hudi/pull/5522#discussion_r869492731
##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java:
##########
@@ -104,7 +105,7 @@ public void init() throws IOException,
InterruptedException, URISyntaxException
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) {
// write data to file
- List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+ List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0,
100).stream().map(HoodieIndexRecord::new).collect(Collectors.toList());
Review Comment:
instead of conversion, can we create new APIs in the test utilities to
generate HoodieRecords? we need to deprecate and remove test utils'
avro-specific APIs eventually as well.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java:
##########
@@ -79,31 +75,27 @@ public HoodieLazyInsertIterable(Iterator<HoodieRecord<T>>
recordItr, boolean are
// Used for caching HoodieRecord along with insertValue. We need this to
offload computation work to buffering thread.
public static class HoodieInsertValueGenResult<T extends HoodieRecord> {
Review Comment:
`T` is the type of data resides in HoodieRecord. To establish a convention,
can we use `R` to denote subclass of HoodieRecord ? we'd need to standardize
this across the codebase
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -68,9 +67,11 @@ public static HoodieFileSliceReader getFileSliceReader(
}
}
- private static HoodieRecord<? extends HoodieRecordPayload> transform(
- GenericRecord record, HoodieMergedLogRecordScanner scanner, String
payloadClass,
- String preCombineField, Option<Pair<String, String>>
simpleKeyGenFieldsOpt) {
+ private static HoodieRecord transform(GenericRecord record,
+
HoodieMergedLogRecordScanner scanner,
+ String
payloadClass,
+ String
preCombineField,
+
Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
Review Comment:
fix the indentation?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexRecord.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This only use by reader returning.
+ */
+public class HoodieIndexRecord extends HoodieRecord<IndexedRecord> {
Review Comment:
`HoodieAvroIndexedRecord` to be precise
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -502,12 +521,16 @@ private void writeToBuffer(HoodieRecord<T> record) {
record.seal();
}
// fetch the ordering val first in case the record was deflated.
- final Comparable<?> orderingVal = record.getData().getOrderingValue();
- Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
+ final Comparable<?> orderingVal =
((HoodieRecordPayload)record.getData()).getOrderingValue();
+ Option<HoodieRecord> indexedRecord = prepareRecord(record);
if (indexedRecord.isPresent()) {
// Skip the ignored record.
- if (!indexedRecord.get().equals(IGNORE_RECORD)) {
- recordList.add(indexedRecord.get());
+ try {
+ if (indexedRecord.isPresent() &&
!indexedRecord.get().isIgnoredRecord(tableSchema, config.getProps())) {
+ recordList.add(indexedRecord.get());
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing record " + indexedRecord.get(), e);
Review Comment:
why this could throw IOException? thought it's just checking some configs.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -130,22 +128,20 @@ public boolean canWrite(HoodieRecord record) {
* Perform the actual writing of the given record into the backing file.
*/
@Override
- public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
- Option recordMetadata = ((HoodieRecordPayload)
record.getData()).getMetadata();
- if (HoodieOperation.isDelete(record.getOperation())) {
- avroRecord = Option.empty();
- }
+ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties
props) {
+ Option<Map<String, String>> recordMetadata = record.getMetadata();
try {
- if (avroRecord.isPresent()) {
- if (avroRecord.get().equals(IGNORE_RECORD)) {
+ if (!HoodieOperation.isDelete(record.getOperation()) &&
record.isPresent(schema, config.getProps())) {
+ if (record.isIgnoredRecord(schema, config.getProps())) {
Review Comment:
/nit a concise name could be `record.shouldIgnore()`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,8 +69,8 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config,
String instantTime,
@Override
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord>
payload) {
Review Comment:
this var name is confusing esp. when we're removing payload class.. maybe
just `genResult` to make it distinct?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -34,16 +41,55 @@
*/
public abstract class HoodieRecord<T> implements Serializable {
- public static final String COMMIT_TIME_METADATA_FIELD =
"_hoodie_commit_time";
- public static final String COMMIT_SEQNO_METADATA_FIELD =
"_hoodie_commit_seqno";
- public static final String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key";
- public static final String PARTITION_PATH_METADATA_FIELD =
"_hoodie_partition_path";
- public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
- public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
- public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";
+ public static final String COMMIT_TIME_METADATA_FIELD =
HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName();
+ public static final String COMMIT_SEQNO_METADATA_FIELD =
HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName();
+ public static final String RECORD_KEY_METADATA_FIELD =
HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName();
+ public static final String PARTITION_PATH_METADATA_FIELD =
HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName();
+ public static final String FILENAME_METADATA_FIELD =
HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName();
+ public static final String OPERATION_METADATA_FIELD =
HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName();
+ public static final String HOODIE_IS_DELETED =
HoodieMetadataField.DELETED_METADATA_FIELD.getFieldName();
+
+ public enum HoodieMetadataField {
+ COMMIT_TIME_METADATA_FIELD("_hoodie_commit_time"),
+ COMMIT_SEQNO_METADATA_FIELD("_hoodie_commit_seqno"),
+ RECORD_KEY_METADATA_FIELD("_hoodie_record_key"),
+ PARTITION_PATH_METADATA_FIELD("_hoodie_partition_path"),
+ FILENAME_METADATA_FIELD("_hoodie_file_name"),
+ OPERATION_METADATA_FIELD("_hoodie_operation"),
+ DELETED_METADATA_FIELD("_hoodie_is_deleted");
+
+ private final String fieldName;
+
+ HoodieMetadataField(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ }
public static int FILENAME_METADATA_FIELD_POS = 4;
Review Comment:
make this pos info part of the enum as well?
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -104,6 +105,19 @@ public class HoodieAvroUtils {
public static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
+ /**
+ * TODO
Review Comment:
what's the plan here?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -201,33 +202,24 @@ protected boolean isUpdateRecord(HoodieRecord<T>
hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null;
}
- private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord)
{
- Option<Map<String, String>> recordMetadata =
hoodieRecord.getData().getMetadata();
+ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
+ Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
try {
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is an update or insert record.
boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
// If the format can not record the operation field, nullify the DELETE
payload manually.
boolean nullifyPayload =
HoodieOperation.isDelete(hoodieRecord.getOperation()) &&
!config.allowOperationMetadataField();
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR,
String.valueOf(isUpdateRecord));
- Option<IndexedRecord> avroRecord = nullifyPayload ? Option.empty() :
hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
- if (avroRecord.isPresent()) {
- if (avroRecord.get().equals(IGNORE_RECORD)) {
- return avroRecord;
+ Option<HoodieRecord> finalRecord = Option.empty();
+ if (!nullifyPayload) {
+ if (hoodieRecord.isIgnoredRecord(tableSchema, recordProperties)) {
+ return Option.empty();
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata
in schema
- GenericRecord rewriteRecord = rewriteRecord((GenericRecord)
avroRecord.get());
- avroRecord = Option.of(rewriteRecord);
- String seqId =
- HoodieRecord.generateSequenceId(instantTime, getPartitionId(),
RECORD_COUNTER.getAndIncrement());
- if (config.populateMetaFields()) {
- HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord,
hoodieRecord.getRecordKey(),
- hoodieRecord.getPartitionPath(), fileId);
- HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord,
instantTime, seqId);
- }
- if (config.allowOperationMetadataField()) {
- HoodieAvroUtils.addOperationToRecord(rewriteRecord,
hoodieRecord.getOperation());
- }
+ HoodieRecord rewriteRecord = hoodieRecord.rewriteRecord(tableSchema,
recordProperties, schemaOnReadEnabled, writeSchemaWithMetaFields);
+ HoodieRecord populateRecord = populateMetadataFields(rewriteRecord,
tableSchema, recordProperties);
Review Comment:
/nit: rewrittenRecord and populatedRecord
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -243,14 +235,32 @@ private Option<IndexedRecord>
getIndexedRecord(HoodieRecord<T> hoodieRecord) {
// part of marking
// record successful.
hoodieRecord.deflate();
- return avroRecord;
+ return finalRecord;
} catch (Exception e) {
LOG.error("Error writing record " + hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}
return Option.empty();
}
+ private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord,
Schema schema, Properties prop) throws IOException {
Review Comment:
should this be a common util method ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]