vinothchandar commented on a change in pull request #1804: URL: https://github.com/apache/hudi/pull/1804#discussion_r478483860
########## File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; + +/** + * Hoodie merge handle which writes records (new inserts or updates) sorted by their key. + * + * The implementation performs a merge-sort by comparing the key of the record being written to the list of + * keys in newRecordKeys (sorted in-memory). + */ +public class HoodieSortedMergeHandle<T extends HoodieRecordPayload> extends HoodieMergeHandle<T> { + + private Queue<String> newRecordKeysSorted = new PriorityQueue<>(); + + public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, + Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, sparkTaskContextSupplier); + newRecordKeysSorted.addAll(keyToNewRecords.keySet()); + } + + /** + * Called by compactor code path. + */ + public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, + Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged, + sparkTaskContextSupplier); + + newRecordKeysSorted.addAll(keyToNewRecords.keySet()); + } + + /** + * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. + */ + @Override + public void write(GenericRecord oldRecord) { + String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + + // To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than + // the oldRecord's key. + while (!newRecordKeysSorted.isEmpty() && newRecordKeysSorted.peek().compareTo(key) <= 0) { + String keyToPreWrite = newRecordKeysSorted.remove(); Review comment: I am thinking we don't need the map in HoodieMergeHandle or the priorityQueue. The record which have changeed i.e. the input iterator is already sorted. lets call it `inputItr` So , we can just compare the recordBeingWritten with inputItr.next() and write out the smallest one, if equal, we call the payload to merge. This will avoid any kind of memory overhead ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java ########## @@ -94,6 +100,16 @@ public Builder parquetPageSize(int pageSize) { return this; } + public Builder hfileMaxFileSize(long maxFileSize) { + props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize)); Review comment: not following. sorry. are you suggesting having a single config or two? So, we need to have a config per usage of HFile. so we can control the base file size for data, metadata, record index separately. We cannot have a generic base.file.size or hfile.size config here, at this level IMO. cc @prashantwason ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java ########## @@ -94,6 +100,16 @@ public Builder parquetPageSize(int pageSize) { return this; } + public Builder hfileMaxFileSize(long maxFileSize) { + props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize)); Review comment: >I see that you split this function into file specific functions. That doable but with more base file formats added, it may be cumbersome and verbose to keep adding the .limitXXXFileSize for specific formats. @prashantwason I think we need to eventually have a config "per use" of base file - data, metadata, index - since people may want to control them differently. So, in that sense, this has to kind of change. yes the change is backwards compatible to RDD clients (which I thought was okay, since its just uber. if you prefer to not have that, lmk. IMO, its about time, we cleaned these up, given we are moving to having way more base files/tables in the mix) ########## File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java ########## @@ -90,9 +91,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa * Called by the compactor code path. */ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) { + String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap, Review comment: Ideally not. the more the compactor can function wihtout knowing the base file type specifics, the better ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import javax.annotation.Nonnull; + +/** + * HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile + * base file format. + */ +public class HoodieHFileDataBlock extends HoodieDataBlock { + private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class); + private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ; + private static int blockSize = 1 * 1024 * 1024; + + public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader, + @Nonnull Map<HeaderMetadataType, String> logBlockFooter, + @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content, + FSDataInputStream inputStream, boolean readBlockLazily) { + super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); + } + + public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content, + boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, + Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) { + super(content, inputStream, readBlockLazily, + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, + footer); + } + + public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) { + super(records, header, new HashMap<>()); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.HFILE_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords() throws IOException { + HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm) + .build(); + Configuration conf = new Configuration(); + CacheConfig cacheConfig = new CacheConfig(conf); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream ostream = new FSDataOutputStream(baos, null); + + HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig) + .withOutputStream(ostream).withFileContext(context).create(); + + // Serialize records into bytes + Map<String, byte[]> sortedRecordsMap = new TreeMap<>(); Review comment: its a tree map. its sorted/ordered ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import javax.annotation.Nonnull; + +/** + * HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile + * base file format. + */ +public class HoodieHFileDataBlock extends HoodieDataBlock { + private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class); + private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ; + private static int blockSize = 1 * 1024 * 1024; + + public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader, + @Nonnull Map<HeaderMetadataType, String> logBlockFooter, + @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content, + FSDataInputStream inputStream, boolean readBlockLazily) { + super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); + } + + public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content, + boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, + Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) { + super(content, inputStream, readBlockLazily, + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, + footer); + } + + public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) { + super(records, header, new HashMap<>()); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.HFILE_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords() throws IOException { + HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm) + .build(); + Configuration conf = new Configuration(); + CacheConfig cacheConfig = new CacheConfig(conf); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream ostream = new FSDataOutputStream(baos, null); + + HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig) + .withOutputStream(ostream).withFileContext(context).create(); + + // Serialize records into bytes + Map<String, byte[]> sortedRecordsMap = new TreeMap<>(); Review comment: >Does not matter where the sorting is performed. It will definitely be referable in the partitioner. @prashantwason for some reason I cannot find your comment overlaid here. (are you using the review feature?) . Anywasy, what I meant was just `rdd.repartitionAndSort...` for the AppendHandle path as well. There is no generic partitioner in Hudi, since the ones we have are all serving different purposes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
