codope commented on code in PR #9819:
URL: https://github.com/apache/hudi/pull/9819#discussion_r1352457874
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -736,6 +736,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Backup instants removed during rollback and restore
(useful for debugging)");
+ public static final ConfigProperty<Boolean> FILE_GROUP_RAEDER_ENABLED =
ConfigProperty
+ .key("hoodie.file.group.reader.enabled")
Review Comment:
How about renaming it to indicate merging based on record positions,
something like `hoodie.reader.use.record.positions`? Also, should it be in
`HoodieWriteConfig`? Maybe move to `DataSourceReadOptions` but this is not
strictly a datasource option though.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -756,6 +792,50 @@ private void processDataBlock(HoodieDataBlock dataBlock,
Option<KeySpec> keySpec
}
}
+ private void processDataBlockBasedOnPosition(HoodieDataBlock dataBlock,
Option<KeySpec> keySpecOpt) throws Exception {
+ checkState(partitionNameOverrideOpt.isPresent() ||
partitionPathFieldOpt.isPresent(),
+ "Either partition-name override or partition-path field had to be
present");
+
+ Option<Pair<String, String>> recordKeyPartitionPathFieldPair =
populateMetaFields
+ ? Option.empty()
+ : Option.of(Pair.of(recordKeyField,
partitionPathFieldOpt.orElse(null)));
+
+ // Prepare position information for the records in the block.
+ String blockPositionStr =
dataBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.RECORD_POSITIONS);
+ // TODO: Use keys to filter this list.
+ List<Integer> blockPositions = new ArrayList<>();
+ prepareRecordPositions(
Review Comment:
It's possible that `blockPositions` is still empty after
`prepareRecordPositions`. Do need some validation?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -410,6 +418,34 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
}
}
+ private static void prepareRecordPositions(String blockPositionStr,
+ List<Integer> blockPositions,
+ Set<Integer> globalPositions,
+ boolean positionBasedMergeEnabled,
+ boolean useEventOrdering) throws
IOException {
Review Comment:
When will `useEventOrdering` be true? Right now I see this is always called
with `false`. Should a separate boolean member variable be added so that it can
be set from a higher layer?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -736,6 +736,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Backup instants removed during rollback and restore
(useful for debugging)");
+ public static final ConfigProperty<Boolean> FILE_GROUP_RAEDER_ENABLED =
ConfigProperty
+ .key("hoodie.file.group.reader.enabled")
+ .defaultValue(false)
Review Comment:
Also, please add `.sinceVersion("1.0.0")`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -410,6 +418,34 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
}
}
+ private static void prepareRecordPositions(String blockPositionStr,
+ List<Integer> blockPositions,
+ Set<Integer> globalPositions,
+ boolean positionBasedMergeEnabled,
+ boolean useEventOrdering) throws
IOException {
+ if (!positionBasedMergeEnabled || blockPositionStr == null ||
blockPositionStr.isEmpty()) {
+ return;
+ }
+
+ Roaring64NavigableMap posBitmap =
PositionUtils.deserializeCodecStringWithRoaring64NavigableMap(blockPositionStr);
Review Comment:
Can we not directly use `HoodieLogBlock#getRecordPositions`? Something like
below:
```
Roaring64NavigableMap recordPositions = logBlock.getRecordPositions();
if (recordPositions != null && !recordPositions.isEmpty()) {
recordPositions.iterator();
...
}
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java:
##########
@@ -230,6 +242,34 @@ public void processNextRecord(T record, Map<String,
Object> metadata) throws IOE
}
}
+ @Override
+ public void processNextRecordWithPosition(T record,
+ Map<String, Object> metadata,
+ int sequenceNo,
+ List<Integer> positions) throws
IOException {
+ String key = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
+ int position = positions.get(sequenceNo);
+ // No position information is found for this record.
+ if (position < 0) {
+ throw new IOException("No position information is found for the record "
+ key);
+ }
+
+ Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
positionToRecords.get(position);
+ if (existingRecordMetadataPair != null) {
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata,
readerSchema),
+ readerSchema,
+
readerContext.constructHoodieRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema),
+ readerSchema,
+ this.getPayloadProps()).get().getLeft();
Review Comment:
So payload will determine whether event time order is needed or not. Please
check my comment regarding `useEventOrdering` boolean argument in
`prepareRecordPositions` method.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java:
##########
@@ -230,6 +242,34 @@ public void processNextRecord(T record, Map<String,
Object> metadata) throws IOE
}
}
+ @Override
+ public void processNextRecordWithPosition(T record,
+ Map<String, Object> metadata,
+ int sequenceNo,
+ List<Integer> positions) throws
IOException {
+ String key = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
+ int position = positions.get(sequenceNo);
Review Comment:
`positions` can be empty?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -138,6 +140,9 @@ public abstract class BaseHoodieLogRecordReader<T> {
private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
private final boolean enableOptimizedLogBlocksScan;
+ private final boolean positionBasedMergeEnabled;
+ // Use it to check if a position has been processed for event time ordering.
+ private Set<Integer> globalPositions = new HashSet<>();
Review Comment:
make it final?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -74,13 +75,16 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
// Length of bytes to read from the base file
private final long length;
// Key to record and metadata mapping from log files
+ private final boolean positionBasedWriteEnabled;
private final Map<String, Pair<Option<T>, Map<String, Object>>>
logFileRecordMapping = new HashMap<>();
+ private final Map<Integer, Pair<Option<T>, Map<String, Object>>>
positionToRecords = new HashMap<>();
Review Comment:
i'm wondering if we should use ConcurrentHashMap? There can be concurrent
calls to `scanLogFies`, when I want to read two file slices at the same time.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java:
##########
@@ -230,6 +242,34 @@ public void processNextRecord(T record, Map<String,
Object> metadata) throws IOE
}
}
+ @Override
+ public void processNextRecordWithPosition(T record,
+ Map<String, Object> metadata,
+ int sequenceNo,
+ List<Integer> positions) throws
IOException {
Review Comment:
I think the record positions are `long` right?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -113,7 +118,8 @@ public HoodieFileGroupReader(HoodieReaderContext<T>
readerContext,
Schema avroSchema,
TypedProperties props,
long start,
- long length) {
+ long length,
+ boolean positiionBasedWriteEnabled) {
Review Comment:
Sounds confusing to have `...writeEnabled` in a reader class. Maybe rename
to `shouldUseRecordPositions`?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java:
##########
@@ -210,11 +221,32 @@ public List<String> getFileIds(String partitionPath) {
}
public void checkDataEquality(int numRecords) {
- List<Row> rows = spark()
+ Map<String, String> properties = new HashMap<>();
+ properties.put(
+ "hoodie.datasource.write.record.merger.impls",
Review Comment:
Let's use the config variables defined for these keys.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/PositionUtils.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.util.Base64CodecUtil;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class PositionUtils {
Review Comment:
Thanks for extracting to a separate util class. Let's rename to
`RecordPositionUtils`?
Also, please add unit tests for the publoc methods here.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/FileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import kotlin.NotImplementedError
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile,
HoodieRecord}
+import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, PartitionFileSliceMapping, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+class FileGroupReaderBasedParquetFileFormat(tableState:
Broadcast[HoodieTableState],
+ tableSchema:
Broadcast[HoodieTableSchema],
+ tableName: String,
+ mergeType: String,
+ mandatoryFields: Seq[String],
+ isMOR: Boolean,
+ isBootstrap: Boolean,
+ positionBasedWriteEnabled:
Boolean) extends ParquetFileFormat with SparkAdapterSupport {
+ var isProjected = false
+
+ /**
+ * Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
+ * while the other side can't
+ */
+ private var supportBatchCalled = false
+ private var supportBatchResult = false
+
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ if (!supportBatchCalled) {
+ supportBatchCalled = true
+ supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+ }
+ supportBatchResult
+ }
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ false
+ }
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema)
+ val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
+ val requiredMeta = StructType(requiredSchemaSplits._1)
+ val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+ val (baseFileReader, preMergeBaseFileReader, _, _) =
buildFileReaders(sparkSession,
+ dataSchema, partitionSchema, requiredSchema, filters, options,
hadoopConf, requiredSchemaWithMandatory,
+ requiredWithoutMeta, requiredMeta)
+ val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ (file: PartitionedFile) => {
+ file.partitionValues match {
+ case fileSliceMapping: PartitionFileSliceMapping =>
+ val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ if (FSUtils.isLogFile(filePath)) {
+ // TODO: Use FileGroupReader here.
+ throw new NotImplementedError("Not support reading with only log
files")
+ } else {
+ fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
+ case Some(fileSlice) =>
+ val hoodieBaseFile = fileSlice.getBaseFile.get()
+ val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
+ val partitionValues = fileSliceMapping.getInternalRow
+ val logFiles = getLogFilesFromSlice(fileSlice)
+ if (requiredSchemaWithMandatory.isEmpty) {
+ val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+ // TODO: Use FileGroupReader here.
+ baseFileReader(baseFile)
+ } else if (bootstrapFileOpt.isPresent) {
+ // TODO: Use FileGroupReader here.
+ throw new NotImplementedError("Not support reading bootstrap
file")
+ } else {
+ if (logFiles.isEmpty) {
+ throw new IllegalStateException(
+ "should not be here since file slice should not have
been broadcasted "
+ + "since it has no log or data files")
+ }
+ buildFileGroupIterator(
+ preMergeBaseFileReader,
+ partitionValues,
+ hoodieBaseFile,
+ logFiles,
+ requiredSchemaWithMandatory,
+ broadcastedHadoopConf.value.value,
+ hoodieBaseFile.getFileLen,
+ positionBasedWriteEnabled
+ )
+ }
+ // TODO: Use FileGroupReader here.
Review Comment:
let's file followup JIRAs for different query types.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/PositionUtils.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.util.Base64CodecUtil;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class PositionUtils {
+ public static byte[] serializeWithRoaring64NavigableMap(int[] positions)
throws IOException {
+ Roaring64NavigableMap bitmap = new Roaring64NavigableMap();
+ for (int pos : positions) {
+ bitmap.add(pos);
+ }
+ bitmap.runOptimize();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ bitmap.serializePortable(dos);
+ return baos.toByteArray();
+ }
+
+ public static byte[] serializeWithRoaring64NavigableMap(List<Long>
positions) throws IOException {
+ Roaring64NavigableMap bitmap = new Roaring64NavigableMap();
+ positions.forEach(bitmap::add);
+ bitmap.runOptimize();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ bitmap.serializePortable(dos);
+ return baos.toByteArray();
+ }
+
+ public static Roaring64NavigableMap
deserializeWithRoaring64NavigableMap(byte[] content) throws IOException {
+ Roaring64NavigableMap bitmap = new Roaring64NavigableMap();
+ ByteArrayInputStream bais = new ByteArrayInputStream(content);
+ DataInputStream dis = new DataInputStream(bais);
+ bitmap.deserializePortable(dis);
+ return bitmap;
+ }
+
+ public static String
serializeCodecStringWithRoaring64NavigableMap(List<Long> positions) throws
IOException {
+ return
Base64CodecUtil.encode(serializeWithRoaring64NavigableMap(positions));
+ }
+
+ public static Roaring64NavigableMap
deserializeCodecStringWithRoaring64NavigableMap(String content) throws
IOException {
+ return
deserializeWithRoaring64NavigableMap(Base64CodecUtil.decode(content));
+ }
+
+ public static String serializeCodecStringWithRoaring64NavigableMap(int[]
positions) throws IOException {
Review Comment:
Do we need it when we already have
`serializeCodecStringWithRoaring64NavigableMap(List<Long> positions)` above?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala:
##########
@@ -150,6 +150,12 @@ class NewHoodieParquetFileFormatUtils(val sqlContext:
SQLContext,
sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(config.defaultValue())))
}
+ private def checkIfAConfigurationEnabled(config:
ConfigProperty[java.lang.Boolean],
Review Comment:
maybe move this to `ConfigUtils`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]