alexeykudinkin commented on code in PR #6727: URL: https://github.com/apache/hudi/pull/6727#discussion_r979017718
########## hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala: ########## @@ -18,7 +18,9 @@ package org.apache.spark.sql.avro Review Comment: Please avoid any changes to the borrowed classes -- we keep changes to them to absolutely necessary minimum to make sure they do not diverge from Spark impl, and we're able to cherry-pick and carry forward these changes whenever we backport new version (from Spark) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, LogIteratorUtils, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableConfig._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordReader +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.function.Predicate +import java.util.stream.Collectors +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private val fs = metaClient.getFs.getFileSystem + + private val conf = new Configuration(confBroadcast.value.value) + + private val basePath = metaClient.getBasePathV2 + + private val tableConfig = metaClient.getTableConfig + + private val populateMetaFields = tableConfig.populateMetaFields() + + private val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + } + + private val recordKeyField: String = if (populateMetaFields) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private val preCombineFieldOpt: Option[String] = Option(metaClient.getTableConfig.getPreCombineField) + + private val tableState = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + HoodieTableState( + pathToString(basePath), + split.commitToChanges.map(_._1.getTimestamp).max, + recordKeyField, + preCombineFieldOpt, + usesVirtualKeys = false, + metaClient.getTableConfig.getPayloadClass, + metadataConfig + ) + } + + private lazy val mapper: ObjectMapper = { Review Comment: Do we still need this, given we moved to Avro? ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +/** + * This contains all the information that retrieve the change data at a single file group and + * at a single commit. + * + * For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a current version of + * the base file in the group, and [[beforeFileSlice]] is None. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REMOVE_BASE_FILE]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the previous version of the base file in the group. + * For [[cdcFileType]] = [[CDCFileTypeEnum.CDC_LOG_FILE]], [[cdcFile]] is a log file with cdc blocks. + * when enable the supplemental logging, both [[beforeFileSlice]] and [[afterFileSlice]] are None, + * otherwise these two are the previous and current version of the base file. + * For [[cdcFileType]] = [[CDCFileTypeEnum.MOR_LOG_FILE]], [[cdcFile]] is a normal log file and + * [[beforeFileSlice]] is the previous version of the file slice. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REPLACED_FILE_GROUP]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the current version of the file slice. + */ +public class HoodieCDCFileSplit implements Serializable { + + /** + * * the change type, which decide to how to retrieve the change data. more details see: [[CDCFileTypeEnum]] + * */ + private HoodieCDCLogicalFileType cdcFileType; Review Comment: Please make all fields final ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java: ########## @@ -236,6 +241,44 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); } + /** + * parse the bytes of deltacommit, and get the base file and the log files belonging to this + * provided file group. + */ + // TODO: refactor this method to avoid doing the json tree walking (HUDI-4822). + public static Option<Pair<String, List<String>>> getFileSliceForFileGroupFromDeltaCommit( + byte[] bytes, HoodieFileGroupId fileGroupId) { + String jsonStr = new String(bytes, StandardCharsets.UTF_8); + if (jsonStr.isEmpty()) { + return Option.empty(); + } + + try { + JsonNode ptToWriteStatsMap = JsonUtils.getObjectMapper().readTree(jsonStr).get("partitionToWriteStats"); Review Comment: Actually, why do we even need this method? We have the utils to deser the `HoodieCommitMetadata`, right? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf + +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection} +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.config.HoodiePayloadConfig +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes +import org.apache.hudi.LogIteratorUtils._ +import org.apache.hudi.internal.schema.InternalSchema + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType + +import java.io.Closeable +import java.util.Properties + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ + +/** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in + * Delta Log files (represented as [[InternalRow]]s) + */ +class LogFileIterator(split: HoodieMergeOnReadFileSplit, Review Comment: @YannByron are you making any changes to these or just extracting this code practically as is (with minor changes to abstract params)? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogIteratorUtils.scala: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable +import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} + +import scala.collection.JavaConverters._ +import scala.util.Try + +object LogIteratorUtils { Review Comment: Let's consolidate this w/ LogFileIterator (let's name this `LogFileIterator`, there's no need for separate utils object) ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +/** + * This contains all the information that retrieve the change data at a single file group and + * at a single commit. + * + * For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a current version of + * the base file in the group, and [[beforeFileSlice]] is None. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REMOVE_BASE_FILE]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the previous version of the base file in the group. + * For [[cdcFileType]] = [[CDCFileTypeEnum.CDC_LOG_FILE]], [[cdcFile]] is a log file with cdc blocks. + * when enable the supplemental logging, both [[beforeFileSlice]] and [[afterFileSlice]] are None, + * otherwise these two are the previous and current version of the base file. + * For [[cdcFileType]] = [[CDCFileTypeEnum.MOR_LOG_FILE]], [[cdcFile]] is a normal log file and + * [[beforeFileSlice]] is the previous version of the file slice. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REPLACED_FILE_GROUP]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the current version of the file slice. + */ +public class HoodieCDCFileSplit implements Serializable { + + /** + * * the change type, which decide to how to retrieve the change data. more details see: [[CDCFileTypeEnum]] Review Comment: nit: Formatting ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordReader.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.exception.HoodieIOException; + +import java.io.IOException; + +public class HoodieCDCLogRecordReader implements ClosableIterator<IndexedRecord> { Review Comment: This is an Iterator rather than Reader ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SafeAvroProjection.scala: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} + +import org.apache.hudi.SafeAvroProjection.collectFieldOrdinals +import org.apache.hudi.common.util.ValidationUtils.checkState + +import scala.collection.JavaConverters._ + +// TODO extract to HoodieAvroSchemaUtils +abstract class AvroProjection extends (GenericRecord => GenericRecord) + +class SafeAvroProjection( Review Comment: nit: Please make sure we format params in-line w/ existing style formatting (i believe it'd be also captured in the style-guide): ``` def foo(a: Int, b: String, ...) ``` ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, LogIteratorUtils, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableConfig._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordReader +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.function.Predicate +import java.util.stream.Collectors +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private val fs = metaClient.getFs.getFileSystem Review Comment: These vals should be lazy by default ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ########## @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cdc + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, LogIteratorUtils, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.HoodieConversionUtils._ +import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.table.HoodieTableConfig._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType._ +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieCDCLogRecordReader +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import java.io.Closeable +import java.util.Properties +import java.util.function.Predicate +import java.util.stream.Collectors +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The split that will be processed by spark task. + */ +case class HoodieCDCFileGroupSplit( + commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)] +) + +/** + * The Spark [[Partition]]'s implementation. + */ +case class HoodieCDCFileGroupPartition( + index: Int, + split: HoodieCDCFileGroupSplit +) extends Partition + +class HoodieCDCRDD( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + parquetReader: PartitionedFile => Iterator[InternalRow], + originTableSchema: HoodieTableSchema, + cdcSchema: StructType, + requiredCdcSchema: StructType, + changes: Array[HoodieCDCFileGroupSplit]) + extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { + + @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + + private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + + private val cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + metaClient.getTableConfig.cdcSupplementalLoggingMode + ) + + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + + protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition] + new CDCFileGroupIterator(cdcPartition.split, metaClient) + } + + override protected def getPartitions: Array[Partition] = { + changes.zipWithIndex.map{ case (split, index) => + HoodieCDCFileGroupPartition(index, split) + }.toArray + } + + private class CDCFileGroupIterator( + split: HoodieCDCFileGroupSplit, + metaClient: HoodieTableMetaClient + ) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable { + + private val fs = metaClient.getFs.getFileSystem + + private val conf = new Configuration(confBroadcast.value.value) + + private val basePath = metaClient.getBasePathV2 + + private val tableConfig = metaClient.getTableConfig + + private val populateMetaFields = tableConfig.populateMetaFields() + + private val keyGenerator = { + val props = new TypedProperties() + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + } + + private val recordKeyField: String = if (populateMetaFields) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private val preCombineFieldOpt: Option[String] = Option(metaClient.getTableConfig.getPreCombineField) + + private val tableState = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + HoodieTableState( + pathToString(basePath), + split.commitToChanges.map(_._1.getTimestamp).max, + recordKeyField, + preCombineFieldOpt, + usesVirtualKeys = false, + metaClient.getTableConfig.getPayloadClass, + metadataConfig + ) + } + + private lazy val mapper: ObjectMapper = { + val _mapper = new ObjectMapper + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + protected override val avroSchema: Schema = new Schema.Parser().parse(originTableSchema.avroSchemaStr) + + protected override val structTypeSchema: StructType = originTableSchema.structTypeSchema + + private val serializer = sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema, + avroSchema, nullable = false) + + private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) + + private val cdcAvroSchema: Schema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + cdcSupplementalLoggingMode, + HoodieAvroUtils.removeMetadataFields(avroSchema) + ) + + private val cdcSparkSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema) + + /** + * the deserializer used to convert the CDC GenericRecord to Spark InternalRow. + */ + private val cdcRecordDeserializer: HoodieAvroDeserializer = { + sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema) + } + + private val projection: UnsafeProjection = generateUnsafeProjection(cdcSchema, requiredCdcSchema) + + // iterator on cdc file + private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator + + // The instant that is currently being processed + private var currentInstant: HoodieInstant = _ + + // The change file that is currently being processed + private var currentChangeFile: HoodieCDCFileSplit = _ + + /** + * two cases will use this to iterator the records: Review Comment: nit: Let's make sure comments are consistent as well and start w/ capital (please ditto for all comments) ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +/** + * This contains all the information that retrieve the change data at a single file group and + * at a single commit. + * + * For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a current version of + * the base file in the group, and [[beforeFileSlice]] is None. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REMOVE_BASE_FILE]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the previous version of the base file in the group. + * For [[cdcFileType]] = [[CDCFileTypeEnum.CDC_LOG_FILE]], [[cdcFile]] is a log file with cdc blocks. + * when enable the supplemental logging, both [[beforeFileSlice]] and [[afterFileSlice]] are None, + * otherwise these two are the previous and current version of the base file. + * For [[cdcFileType]] = [[CDCFileTypeEnum.MOR_LOG_FILE]], [[cdcFile]] is a normal log file and + * [[beforeFileSlice]] is the previous version of the file slice. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REPLACED_FILE_GROUP]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the current version of the file slice. + */ +public class HoodieCDCFileSplit implements Serializable { + + /** + * * the change type, which decide to how to retrieve the change data. more details see: [[CDCFileTypeEnum]] + * */ + private HoodieCDCLogicalFileType cdcFileType; + + /** + * the file that the change data can be parsed from. + */ + private String cdcFile; + + /** + * the file slice that are required when retrieve the pre_image data. Review Comment: Let's align terms w/ what we use elsewhere (before/after) ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordReader.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.exception.HoodieIOException; + +import java.io.IOException; + +public class HoodieCDCLogRecordReader implements ClosableIterator<IndexedRecord> { + + private final HoodieLogFile cdcLogFile; + + private final HoodieLogFormat.Reader reader; + + private ClosableIterator<IndexedRecord> itr; + + public HoodieCDCLogRecordReader( + FileSystem fs, + Path cdcLogPath, + Schema cdcSchema) throws IOException { + this.cdcLogFile = new HoodieLogFile(fs.getFileStatus(cdcLogPath)); + this.reader = new HoodieLogFileReader(fs, cdcLogFile, cdcSchema, + HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + } + + @Override + public boolean hasNext() { + if (itr == null || !itr.hasNext()) { Review Comment: nit: If we flip this conditional we can decrease nested-ness ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCLogicalFileType.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +/** + * Here define four cdc file types. The different cdc file type will decide which file will be + * used to extract the change data, and how to do this. + * + * CDC_LOG_FILE: Review Comment: Let's make sure these are in-sync w/ the RFC @xushiyan did we end up revisiting this terminology in the RFC? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
