alexeykudinkin commented on code in PR #6727: URL: https://github.com/apache/hudi/pull/6727#discussion_r980469454
########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +/** + * Here define five cdc infer cases. The different cdc infer case will decide which file will be + * used to extract the change data, and how to do this. + * + * AS_IS: + * For this type, there must be a real cdc log file from which we get the whole/part change data. + * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after', it keeps all the fields about the + * change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly, + * no more other files need to be loaded. + * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', it keeps the `op`, the key and the + * `before` of the changing record. When `op` is equal to 'i' or 'u', need to get the current record from the + * current base/log file as `after`. + * when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just keeps the `op` and the key of + * the changing record. When `op` is equal to 'i', `before` is null and get the current record + * from the current base/log file as `after`. When `op` is equal to 'u', get the previous + * record from the previous file slice as `before`, and get the current record from the + * current base/log file as `after`. When `op` is equal to 'd', get the previous record from + * the previous file slice as `before`, and `after` is null. + * + * BASE_FILE_INSERT: + * For this type, there must be a base file at the current instant. All the records from this + * file is new-coming, so we can load this, mark all the records with `i`, and treat them as + * the value of `after`. The value of `before` for each record is null. + * + * BASE_FILE_INSERT: Review Comment: Typo ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private lazy val fs = metaClient.getFs.getFileSystem + + private lazy val conf = new Configuration(confBroadcast.value.value) + + private lazy val basePath = metaClient.getBasePathV2 + + private lazy val tableConfig = metaClient.getTableConfig + + private lazy val populateMetaFields = tableConfig.populateMetaFields() + + private lazy val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + } + + private lazy val recordKeyField: String = if (populateMetaFields) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private lazy val preCombineFieldOpt: Option[String] = Option(metaClient.getTableConfig.getPreCombineField) + + private lazy val tableState = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + HoodieTableState( + pathToString(basePath), + split.commitToChanges.map(_._1.getTimestamp).max, + recordKeyField, + preCombineFieldOpt, + usesVirtualKeys = false, + metaClient.getTableConfig.getPayloadClass, + metadataConfig + ) + } + + private lazy val mapper: ObjectMapper = { + val _mapper = new ObjectMapper + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + protected override val avroSchema: Schema = new Schema.Parser().parse(originTableSchema.avroSchemaStr) + + protected override val structTypeSchema: StructType = originTableSchema.structTypeSchema + + private lazy val serializer = sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema, + avroSchema, nullable = false) + + private lazy val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) + + private lazy val cdcAvroSchema: Schema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + cdcSupplementalLoggingMode, + HoodieAvroUtils.removeMetadataFields(avroSchema) + ) + + private lazy val cdcSparkSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema) + + /** + * The deserializer used to convert the CDC GenericRecord to Spark InternalRow. + */ + private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = { + sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema) + } + + private lazy val projection: UnsafeProjection = generateUnsafeProjection(cdcSchema, requiredCdcSchema) + + // Iterator on cdc file + private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator + + // The instant that is currently being processed + private var currentInstant: HoodieInstant = _ + + // The change file that is currently being processed + private var currentChangeFile: HoodieCDCFileSplit = _ + + /** + * Two cases will use this to iterator the records: + * 1) extract the change data from the base file directly, including 'ADD_BASE_File' and 'REMOVE_BASE_File'. Review Comment: @YannByron need to update this to align w/ `HoodieCDCInferenceCase` ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +/** + * Here define five cdc infer cases. The different cdc infer case will decide which file will be + * used to extract the change data, and how to do this. + * + * AS_IS: + * For this type, there must be a real cdc log file from which we get the whole/part change data. + * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after', it keeps all the fields about the + * change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly, + * no more other files need to be loaded. + * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', it keeps the `op`, the key and the + * `before` of the changing record. When `op` is equal to 'i' or 'u', need to get the current record from the + * current base/log file as `after`. + * when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just keeps the `op` and the key of + * the changing record. When `op` is equal to 'i', `before` is null and get the current record + * from the current base/log file as `after`. When `op` is equal to 'u', get the previous + * record from the previous file slice as `before`, and get the current record from the + * current base/log file as `after`. When `op` is equal to 'd', get the previous record from + * the previous file slice as `before`, and `after` is null. + * + * BASE_FILE_INSERT: + * For this type, there must be a base file at the current instant. All the records from this + * file is new-coming, so we can load this, mark all the records with `i`, and treat them as + * the value of `after`. The value of `before` for each record is null. + * + * BASE_FILE_INSERT: + * For this type, there must be an empty file at the current instant, but a non-empty base file + * at the previous instant. First we find this base file that has the same file group and belongs + * to the previous instant. Then load this, mark all the records with `d`, and treat them as + * the value of `before`. The value of `after` for each record is null. + * + * LOG_FILE: + * For this type, a normal log file of mor table will be used. First we need to load the previous + * file slice(including the base file and other log files in the same file group). Then for each + * record from the log file, get the key of this, and execute the following steps: + * 1) if the record is deleted, + * a) if there is a record with the same key in the data loaded, `op` is 'd', 'before' is the + * record from the data loaded, `after` is null; + * b) if there is not a record with the same key in the data loaded, just skip. + * 2) the record is not deleted, + * a) if there is a record with the same key in the data loaded, `op` is 'u', 'before' is the + * record from the data loaded, `after` is the current record; + * b) if there is not a record with the same key in the data loaded, `op` is 'i', 'before' is + * null, `after` is the current record; + * + * REPLACE_COMMIT: + * For this type, it must be a replacecommit, like INSERT_OVERWRITE and DROP_PARTITION. It drops + * a whole file group. First we find this file group. Then load this, mark all the records with + * `d`, and treat them as the value of `before`. The value of `after` for each record is null. + */ +public enum HoodieCDCInferCase { Review Comment: "Infer" is a verb, we should rather call this `HoodieCDCInferenceCase` ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private lazy val fs = metaClient.getFs.getFileSystem + + private lazy val conf = new Configuration(confBroadcast.value.value) Review Comment: Let's avoid copying (and do it only if we modify it) -- it's not cheap ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) Review Comment: @YannByron let's make sure we annotate this as `@transient` (these shouldn't be serialized and passed down to executor, similar to other RDDs) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.DataSourceReadOptions +import org.apache.hudi.HoodieDataSourceHelper +import org.apache.hudi.HoodieTableSchema +import org.apache.hudi.common.table.cdc.HoodieCDCUtils._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.TableSchemaResolver +import org.apache.hudi.common.table.cdc.HoodieCDCExtractor +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.internal.schema.InternalSchema + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.{Row, SQLContext, SparkSession} +import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +/** + * Hoodie CDC Relation extends Spark's [[BaseRelation]], provide the schema of cdc + * and the [[buildScan]] to return the change-data in a specified range. + */ +class CDCRelation( Review Comment: Let's make sure this is rebased onto `HoodieBaseRelation` ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration Review Comment: Let's inline this to avoid mistakes ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private lazy val fs = metaClient.getFs.getFileSystem + + private lazy val conf = new Configuration(confBroadcast.value.value) + + private lazy val basePath = metaClient.getBasePathV2 + + private lazy val tableConfig = metaClient.getTableConfig + + private lazy val populateMetaFields = tableConfig.populateMetaFields() + + private lazy val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + } + + private lazy val recordKeyField: String = if (populateMetaFields) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private lazy val preCombineFieldOpt: Option[String] = Option(metaClient.getTableConfig.getPreCombineField) + + private lazy val tableState = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + HoodieTableState( + pathToString(basePath), + split.commitToChanges.map(_._1.getTimestamp).max, + recordKeyField, + preCombineFieldOpt, + usesVirtualKeys = false, Review Comment: Should be `!populateMetaFields` ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private lazy val fs = metaClient.getFs.getFileSystem + + private lazy val conf = new Configuration(confBroadcast.value.value) + + private lazy val basePath = metaClient.getBasePathV2 + + private lazy val tableConfig = metaClient.getTableConfig + + private lazy val populateMetaFields = tableConfig.populateMetaFields() + + private lazy val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + } + + private lazy val recordKeyField: String = if (populateMetaFields) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private lazy val preCombineFieldOpt: Option[String] = Option(metaClient.getTableConfig.getPreCombineField) + + private lazy val tableState = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + HoodieTableState( + pathToString(basePath), + split.commitToChanges.map(_._1.getTimestamp).max, + recordKeyField, + preCombineFieldOpt, + usesVirtualKeys = false, + metaClient.getTableConfig.getPayloadClass, + metadataConfig + ) + } + + private lazy val mapper: ObjectMapper = { + val _mapper = new ObjectMapper + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + protected override val avroSchema: Schema = new Schema.Parser().parse(originTableSchema.avroSchemaStr) + + protected override val structTypeSchema: StructType = originTableSchema.structTypeSchema + + private lazy val serializer = sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema, + avroSchema, nullable = false) + + private lazy val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) + + private lazy val cdcAvroSchema: Schema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + cdcSupplementalLoggingMode, + HoodieAvroUtils.removeMetadataFields(avroSchema) + ) + + private lazy val cdcSparkSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema) + + /** + * The deserializer used to convert the CDC GenericRecord to Spark InternalRow. + */ + private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = { + sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema) + } + + private lazy val projection: UnsafeProjection = generateUnsafeProjection(cdcSchema, requiredCdcSchema) + + // Iterator on cdc file + private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator + + // The instant that is currently being processed + private var currentInstant: HoodieInstant = _ + + // The change file that is currently being processed + private var currentChangeFile: HoodieCDCFileSplit = _ Review Comment: nit: `currentCDCFileSplit` ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private lazy val fs = metaClient.getFs.getFileSystem + + private lazy val conf = new Configuration(confBroadcast.value.value) + + private lazy val basePath = metaClient.getBasePathV2 + + private lazy val tableConfig = metaClient.getTableConfig + + private lazy val populateMetaFields = tableConfig.populateMetaFields() + + private lazy val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) Review Comment: Why not just do `HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())`? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.stream.Collectors + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private lazy val fs = metaClient.getFs.getFileSystem + + private lazy val conf = new Configuration(confBroadcast.value.value) + + private lazy val basePath = metaClient.getBasePathV2 + + private lazy val tableConfig = metaClient.getTableConfig + + private lazy val populateMetaFields = tableConfig.populateMetaFields() + + private lazy val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + } + + private lazy val recordKeyField: String = if (populateMetaFields) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private lazy val preCombineFieldOpt: Option[String] = Option(metaClient.getTableConfig.getPreCombineField) + + private lazy val tableState = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + HoodieTableState( + pathToString(basePath), + split.commitToChanges.map(_._1.getTimestamp).max, + recordKeyField, + preCombineFieldOpt, + usesVirtualKeys = false, + metaClient.getTableConfig.getPayloadClass, + metadataConfig + ) + } + + private lazy val mapper: ObjectMapper = { + val _mapper = new ObjectMapper + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + protected override val avroSchema: Schema = new Schema.Parser().parse(originTableSchema.avroSchemaStr) + + protected override val structTypeSchema: StructType = originTableSchema.structTypeSchema + + private lazy val serializer = sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema, + avroSchema, nullable = false) + + private lazy val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) + + private lazy val cdcAvroSchema: Schema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + cdcSupplementalLoggingMode, + HoodieAvroUtils.removeMetadataFields(avroSchema) + ) + + private lazy val cdcSparkSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema) + + /** + * The deserializer used to convert the CDC GenericRecord to Spark InternalRow. + */ + private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = { + sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema) + } + + private lazy val projection: UnsafeProjection = generateUnsafeProjection(cdcSchema, requiredCdcSchema) + + // Iterator on cdc file + private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator + + // The instant that is currently being processed + private var currentInstant: HoodieInstant = _ + + // The change file that is currently being processed + private var currentChangeFile: HoodieCDCFileSplit = _ + + /** + * Two cases will use this to iterator the records: + * 1) extract the change data from the base file directly, including 'ADD_BASE_File' and 'REMOVE_BASE_File'. + * 2) when the type of cdc file is 'REPLACED_FILE_GROUP', + * use this to trace the records that are converted from the '[[beforeImageRecords]] + */ + private var recordIter: Iterator[InternalRow] = Iterator.empty + + /** + * Only one case where it will be used is that extract the change data from log files for mor table. + * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that keep all the records of the previous file slice. + */ + private var logRecordIter: Iterator[(String, HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = Iterator.empty + + /** + * Only one case where it will be used is that extract the change data from cdc log files. + */ + private var cdcLogRecordIterator: HoodieCDCLogRecordIterator = _ + + /** + * The next record need to be returned when call next(). + */ + protected var recordToLoad: InternalRow = _ + + /** + * The list of files to which 'beforeImageRecords' belong. + * Use it to determine if 'beforeImageRecords' contains all the required data that extract + * the change data from the current cdc file. + */ + private val beforeImageFiles: mutable.ArrayBuffer[String] = mutable.ArrayBuffer.empty + + /** + * Keep the before-image data. There cases will use this: + * 1) the cdc infer case is [[LOG_FILE]]; + * 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 'op_key'. + */ + private var beforeImageRecords: mutable.Map[String, GenericRecord] = mutable.Map.empty + + /** + * Keep the after-image data. Only one case will use this: + * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 'op_key' or 'cdc_data_before'. + */ + private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty + + private def needLoadNextFile: Boolean = { + !recordIter.hasNext && + !logRecordIter.hasNext && + (cdcLogRecordIterator == null || !cdcLogRecordIterator.hasNext) + } + + @tailrec final def hasNextInternal: Boolean = { Review Comment: I think we should split `CDCFileGroupIterator` into N iterators for every `HoodieCDCInferenceCase` to make it more manageable and easier to understand ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java: ########## @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.AS_IS; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELETE; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +/** + * This class helps to extract all the information which will be used when CDC query. + * + * There are some steps: + * 1. filter out the completed commit instants, and get the related [[HoodieCommitMetadata]] objects. + * 2. initialize the [[HoodieTableFileSystemView]] by the touched data files. + * 3. extract the cdc information: + * generate a [[CDCFileSplit]] object for each of the instant in (startInstant, endInstant) + * and each of the file group which is touched in the range of instants. + */ +public class HoodieCDCExtractor { + + private final HoodieTableMetaClient metaClient; + + private final Path basePath; + + private final FileSystem fs; + + private final HoodieCDCSupplementalLoggingMode supplementalLoggingMode; + + private final String startInstant; + + private final String endInstant; + + private Map<HoodieInstant, HoodieCommitMetadata> commits; + + private HoodieTableFileSystemView fsView; + + public HoodieCDCExtractor( + HoodieTableMetaClient metaClient, + String startInstant, + String endInstant) { + this.metaClient = metaClient; + this.basePath = metaClient.getBasePathV2(); + this.fs = metaClient.getFs().getFileSystem(); + this.supplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig().cdcSupplementalLoggingMode()); + this.startInstant = startInstant; + this.endInstant = endInstant; + init(); + } + + private void init() { + initInstantAndCommitMetadatas(); + initFSView(); + } + + /** + * At the granularity of a file group, trace the mapping between + * each commit/instant and changes to this file group. + */ + public Map<HoodieFileGroupId, List<Pair<HoodieInstant, HoodieCDCFileSplit>>> extractCDCFileSplits() { + if (commits == null || fsView == null) { + throw new HoodieException("Fail to init CDCExtractor"); + } + + Map<HoodieFileGroupId, List<Pair<HoodieInstant, HoodieCDCFileSplit>>> fgToCommitChanges = new HashMap<>(); + for (HoodieInstant instant : commits.keySet()) { + HoodieCommitMetadata commitMetadata = commits.get(instant); + + // parse `partitionToWriteStats` in the metadata of commit + Map<String, List<HoodieWriteStat>> ptToWriteStats = commitMetadata.getPartitionToWriteStats(); + for (String partition : ptToWriteStats.keySet()) { + List<HoodieWriteStat> hoodieWriteStats = ptToWriteStats.get(partition); + hoodieWriteStats.forEach(writeStat -> { + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, writeStat.getFileId()); + // Identify the CDC source involved in this commit and + // determine its type for subsequent loading using different methods. + HoodieCDCFileSplit changeFile = + parseWriteStat(fileGroupId, instant, writeStat, commitMetadata.getOperationType()); + fgToCommitChanges.computeIfAbsent(fileGroupId, k -> new ArrayList<>()); + fgToCommitChanges.get(fileGroupId).add(Pair.of(instant, changeFile)); + }); + } + + if (commitMetadata instanceof HoodieReplaceCommitMetadata) { + HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata) commitMetadata; + Map<String, List<String>> ptToReplacedFileId = replaceCommitMetadata.getPartitionToReplaceFileIds(); + for (String partition : ptToReplacedFileId.keySet()) { + List<String> fileIds = ptToReplacedFileId.get(partition); + fileIds.forEach(fileId -> { + Option<FileSlice> latestFileSliceOpt = fsView.fetchLatestFileSlice(partition, fileId); + if (latestFileSliceOpt.isPresent()) { + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, fileId); + HoodieCDCFileSplit changeFile = new HoodieCDCFileSplit( + REPLACE_COMMIT, null, latestFileSliceOpt, Option.empty()); + if (!fgToCommitChanges.containsKey(fileGroupId)) { + fgToCommitChanges.put(fileGroupId, new ArrayList<>()); + } + fgToCommitChanges.get(fileGroupId).add(Pair.of(instant, changeFile)); + } + }); + } + } + } + return fgToCommitChanges; + } + + /** + * Parse the commit metadata between (startInstant, endInstant], and extract the touched partitions + * and files to build the filesystem view. + */ + private void initFSView() { + Set<String> touchedPartitions = new HashSet<>(); + for (Map.Entry<HoodieInstant, HoodieCommitMetadata> entry : commits.entrySet()) { + HoodieCommitMetadata commitMetadata = entry.getValue(); + touchedPartitions.addAll(commitMetadata.getPartitionToWriteStats().keySet()); + if (commitMetadata instanceof HoodieReplaceCommitMetadata) { + touchedPartitions.addAll( + ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds().keySet() + ); + } + } + try { + List<FileStatus> touchedFiles = new ArrayList<>(); + for (String touchedPartition : touchedPartitions) { + Path partitionPath = FSUtils.getPartitionPath(basePath, touchedPartition); + touchedFiles.addAll(Arrays.asList(fs.listStatus(partitionPath))); + } + this.fsView = new HoodieTableFileSystemView( + metaClient, + metaClient.getCommitsTimeline().filterCompletedInstants(), + touchedFiles.toArray(new FileStatus[0]) + ); + } catch (Exception e) { + throw new HoodieException("Fail to init FileSystem View for CDC", e); + } + } + + + /** + * Extract the required instants from all the instants between (startInstant, endInstant]. + * + * There are some conditions: + * 1) the instant should be completed; + * 2) the instant should be in (startInstant, endInstant]; + * 3) the action of the instant is one of 'commit', 'deltacommit', 'replacecommit'; + * 4) the write type of the commit should have the ability to change the data. + * + * And, we need to recognize which is a 'replacecommit', that help to find the list of file group replaced. + */ + private void initInstantAndCommitMetadatas() { + try { + List<String> requiredActions = Arrays.asList(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION); + HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline(); + Map<HoodieInstant, HoodieCommitMetadata> result = activeTimeLine.getInstants() + .filter(instant -> + instant.isCompleted() + && isInRange(instant.getTimestamp(), startInstant, endInstant) + && requiredActions.contains(instant.getAction().toLowerCase(Locale.ROOT)) + ).map(instant -> { + HoodieCommitMetadata commitMetadata; + try { + if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + commitMetadata = HoodieReplaceCommitMetadata.fromBytes( + activeTimeLine.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + } else { + commitMetadata = HoodieCommitMetadata.fromBytes( + activeTimeLine.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + } + } catch (IOException e) { + throw new HoodieIOException(e.getMessage()); + } + return Pair.of(instant, commitMetadata); + }).filter(pair -> + maybeChangeData(pair.getRight().getOperationType()) + ).collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + this.commits = result; + } catch (Exception e) { + throw new HoodieIOException("Fail to get the commit metadata for CDC"); + } + } + + private Boolean maybeChangeData(WriteOperationType operation) { + return operation == WriteOperationType.INSERT + || operation == WriteOperationType.UPSERT + || operation == WriteOperationType.DELETE + || operation == WriteOperationType.BULK_INSERT + || operation == WriteOperationType.DELETE_PARTITION + || operation == WriteOperationType.INSERT_OVERWRITE + || operation == WriteOperationType.INSERT_OVERWRITE_TABLE + || operation == WriteOperationType.BOOTSTRAP; + } + + /** + * Parse HoodieWriteStat, judge which type the file is, and what strategy should be used to parse CDC data. + * Then build a [[HoodieCDCFileSplit]] object. + */ + private HoodieCDCFileSplit parseWriteStat( + HoodieFileGroupId fileGroupId, + HoodieInstant instant, + HoodieWriteStat writeStat, + WriteOperationType operation) { + Path basePath = metaClient.getBasePathV2(); + FileSystem fs = metaClient.getFs().getFileSystem(); + + HoodieCDCFileSplit cdcFileSplit; + if (StringUtils.isNullOrEmpty(writeStat.getCdcPath())) { + // no cdc log files can be used directly. we reuse the existing data file to retrieve the change data. + String path = writeStat.getPath(); + if (FSUtils.isBaseFile(new Path(path))) { + // this is a base file + if (operation == WriteOperationType.DELETE && writeStat.getNumWrites() == 0L + && writeStat.getNumDeletes() != 0) { + // This is a delete operation wherein all the records in this file group are deleted + // and no records have been writen out a new file. + // So, we find the previous file that this operation delete from, and treat each of + // records as a deleted one. + HoodieBaseFile beforeBaseFile = fsView.getBaseFileOn( + fileGroupId.getPartitionPath(), writeStat.getPrevCommit(), fileGroupId.getFileId() + ).orElseThrow(() -> + new HoodieIOException("Can not get the previous version of the base file") + ); + FileSlice beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>()); + cdcFileSplit = new HoodieCDCFileSplit(BASE_FILE_DELETE, null, Option.empty(), Option.of(beforeFileSlice)); + } else if (writeStat.getNumUpdateWrites() == 0L && writeStat.getNumDeletes() == 0 + && writeStat.getNumWrites() == writeStat.getNumInserts()) { Review Comment: @YannByron there's an issue right now where we undercount inserts (AFAIR) and so `numWrites != numUpdates + numInserts` we need to be careful with this conditionals (and address the underlying issue as well) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
