alexeykudinkin commented on code in PR #6697: URL: https://github.com/apache/hudi/pull/6697#discussion_r975777011
########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieCDCUtils { + + public static final String CDC_LOGFILE_SUFFIX = "-cdc"; + + /* the `op` column represents how a record is changed. */ + public static final String CDC_OPERATION_TYPE = "op"; + + /* the `ts` column represents when a record is changed. */ + public static final String CDC_COMMIT_TIMESTAMP = "ts"; + + /* the pre-image before one record is changed */ + public static final String CDC_BEFORE_IMAGE = "before"; + + /* the post-image after one record is changed */ + public static final String CDC_AFTER_IMAGE = "after"; + + /* the key of the changed record */ + public static final String CDC_RECORD_KEY = "record_key"; + + public static final String[] CDC_COLUMNS = new String[] { + CDC_OPERATION_TYPE, + CDC_COMMIT_TIMESTAMP, + CDC_BEFORE_IMAGE, + CDC_AFTER_IMAGE + }; + + /** + * This is the standard CDC output format. + * Also, this is the schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. + */ + public static final String CDC_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"ts\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}" + + "]}"; + + public static final Schema CDC_SCHEMA = new Schema.Parser().parse(CDC_SCHEMA_STRING); + + /** + * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'. + */ + public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}" + + "]}"; + + public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE = + new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING); + + /** + * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'op_key'. + */ + public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}" + + "]}"; + + public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY = + new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING); + + public static final Schema schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode supplementalLoggingMode) { + switch (supplementalLoggingMode) { + case WITH_BEFORE_AFTER: + return CDC_SCHEMA; + case WITH_BEFORE: + return CDC_SCHEMA_OP_RECORDKEY_BEFORE; + case OP_KEY: + return CDC_SCHEMA_OP_AND_RECORDKEY; + default: + throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode); + } + } + + /** + * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. + */ + public static GenericData.Record cdcRecord( + String op, String commitTime, GenericRecord before, GenericRecord after) { + String beforeJsonStr = recordToJson(before); Review Comment: @YannByron let's make sure we follow-up on this (and let's also create a Jira for it to make sure we're not losing the context) ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.cdc.HoodieCDCOperation; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.AppendResult; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieUpsertException; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class encapsulates all the cdc-writing functions. + */ +public class HoodieCDCLogger implements Closeable { + + private final String commitTime; + + private final String keyField; + + private final Schema dataSchema; + + private final boolean populateMetaFields; + + // writer for cdc data + private final HoodieLogFormat.Writer cdcWriter; + + private final boolean cdcEnabled; + + private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode; + + private final Schema cdcSchema; + + private final String cdcSchemaString; + + // the cdc data + private final Map<String, HoodieAvroPayload> cdcData; + + public HoodieCDCLogger( + String commitTime, + HoodieWriteConfig config, + HoodieTableConfig tableConfig, + Schema schema, + HoodieLogFormat.Writer cdcWriter, + long maxInMemorySizeInBytes) { + try { + this.commitTime = commitTime; + this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema); + this.populateMetaFields = config.populateMetaFields(); + this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD + : tableConfig.getRecordKeyFieldProp(); + this.cdcWriter = cdcWriter; + + this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED); + this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); + + if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA; + this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING; + } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { + this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE; + this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING; + } else { + this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY; + this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING; + } + + this.cdcData = new ExternalSpillableMap<>( + maxInMemorySizeInBytes, + config.getSpillableMapBasePath(), + new DefaultSizeEstimator<>(), + new DefaultSizeEstimator<>(), + config.getCommonConfig().getSpillableDiskMapType(), + config.getCommonConfig().isBitCaskDiskMapCompressionEnabled() + ); + } catch (IOException e) { + throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e); + } + } + + public void put(HoodieRecord hoodieRecord, + GenericRecord oldRecord, + Option<IndexedRecord> newRecord) { + if (cdcEnabled) { + String recordKey = hoodieRecord.getRecordKey(); + GenericData.Record cdcRecord; + if (newRecord.isPresent()) { + GenericRecord record = (GenericRecord) newRecord.get(); + if (oldRecord == null) { + // inserted cdc record + cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey, + null, record); + } else { + // updated cdc record + cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey, + oldRecord, record); + } + } else { + // deleted cdc record + cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey, + oldRecord, null); + } + cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord))); + } + } + + private GenericData.Record createCDCRecord(HoodieCDCOperation operation, + String recordKey, + GenericRecord oldRecord, + GenericRecord newRecord) { + GenericData.Record record; + if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime, + removeCommitMetadata(oldRecord), newRecord); + } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { + record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, + removeCommitMetadata(oldRecord)); + } else { + record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey); + } + return record; + } + + private GenericRecord removeCommitMetadata(GenericRecord record) { + return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>()); + } + + public boolean isEmpty() { + return !this.cdcEnabled || this.cdcData.isEmpty(); + } + + public Option<AppendResult> writeCDCData() { + if (isEmpty()) { + return Option.empty(); + } + + try { + List<IndexedRecord> records = cdcData.values().stream() + .map(record -> { + try { + return record.getInsertValue(cdcSchema).get(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get cdc record", e); + } + }).collect(Collectors.toList()); + + Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString); + + HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField); + AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block)); + + // call close to trigger the data flush. + this.close(); + + return Option.of(result); + } catch (Exception e) { + throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e); + } + } + + @Override + public void close() { + try { + if (cdcWriter != null) { + cdcWriter.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to close HoodieCDCLogger", e); + } finally { + cdcData.clear(); + } + } + + public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger, Review Comment: This method we can move back to `HoodieWriteHandle` as these behavior should be controlled from the `WriteHandle` (previous comments about extraction were related to `setCDCStatIfNeeded`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
