umehrot2 commented on a change in pull request #1459: [HUDI-418] [HUDI-421] Bootstrap Index using HFile and File System View Changes with unit-test URL: https://github.com/apache/incubator-hudi/pull/1459#discussion_r403406705
########## File path: hudi-common/src/main/java/org/apache/hudi/common/bootstrap/BootstrapIndex.java ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bootstrap; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hudi.avro.model.BootstrapIndexInfo; +import org.apache.hudi.avro.model.BootstrapPartitionMetadata; +import org.apache.hudi.avro.model.BootstrapSourceFilePartitionInfo; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BootstrapSourceFileMapping; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Maintains mapping from hudi file id (which contains skeleton file) to external base file. + * It maintains 2 physical indices. + * (a) At partition granularity to lookup all indices for each partition. + * (b) At file-group granularity to lookup bootstrap mapping for an individual file-group. + * + * This implementation uses HFile as physical storage of index. FOr the initial run, bootstrap + * mapping for the entire dataset resides in a single file but care has been taken in naming + * the index files in the same way as Hudi data files so that we can reuse file-system abstraction + * on these index files to manage multiple file-groups. + */ + +public class BootstrapIndex implements Serializable, AutoCloseable { + + private static final Logger LOG = LogManager.getLogger(BootstrapIndex.class); + + public static final String BOOTSTRAP_INDEX_FILE_ID = "00000000-0000-0000-0000-000000000000-0"; + + // Used as naming extensions. + public static final String BOOTSTRAP_INDEX_FILE_TYPE = "hfile"; + + // Additional Metadata written to HFiles. + public static final byte[] INDEX_INFO_KEY = Bytes.toBytes("INDEX_INFO"); + + private final HoodieTableMetaClient metaClient; + // Base Path of external files. + private final String sourceBasePath; + // Well Known Paths for indices + private final String indexByPartitionPath; + private final String indexByFileIdPath; + // Flag to idenitfy if Bootstrap Index is empty or not + private final boolean isBootstrapped; + + // Index Readers + private transient HFile.Reader indexByPartitionReader; + private transient HFile.Reader indexByFileIdReader; + + // Bootstrap Index Info + private transient BootstrapIndexInfo bootstrapIndexInfo; + + public BootstrapIndex(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + + try { + metaClient.initializeBootstrapDirsIfNotExists(); + Path indexByPartitionPath = getIndexByPartitionPath(metaClient); + Path indexByFilePath = getIndexByFileIdPath(metaClient); + if (metaClient.getFs().exists(indexByPartitionPath) && metaClient.getFs().exists(indexByFilePath)) { + this.indexByPartitionPath = indexByPartitionPath.toString(); + this.indexByFileIdPath = indexByFilePath.toString(); + this.sourceBasePath = getBootstrapIndexInfo().getSourceBasePath(); + this.isBootstrapped = true; + } else { + this.indexByPartitionPath = null; + this.indexByFileIdPath = null; + this.sourceBasePath = null; + this.isBootstrapped = false; + } + LOG.info("Loaded BootstrapIndex with source base path :" + sourceBasePath); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + public BootstrapIndexInfo getBootstrapIndexInfo() { + if (null == bootstrapIndexInfo) { + synchronized (this) { + if (null == bootstrapIndexInfo) { + try { + bootstrapIndexInfo = fetchBootstrapIndexInfo(); + } catch (IOException ioe) { + throw new HoodieException(ioe.getMessage(), ioe); + } + } + } + } + return bootstrapIndexInfo; + } + + private static Path getIndexByPartitionPath(HoodieTableMetaClient metaClient) { + return new Path(metaClient.getBootstrapIndexByPartitionFolderName(), + FSUtils.makeBootstrapIndexFileName(HoodieTimeline.BOOTSTRAP_INSTANT_TS, BOOTSTRAP_INDEX_FILE_ID, + BOOTSTRAP_INDEX_FILE_TYPE)); + } + + private static Path getIndexByFileIdPath(HoodieTableMetaClient metaClient) { + return new Path(metaClient.getBootstrapIndexByFileIdFolderNameFolderName(), + FSUtils.makeBootstrapIndexFileName(HoodieTimeline.BOOTSTRAP_INSTANT_TS, BOOTSTRAP_INDEX_FILE_ID, + BOOTSTRAP_INDEX_FILE_TYPE)); + } + + private BootstrapIndexInfo fetchBootstrapIndexInfo() throws IOException { + return TimelineMetadataUtils.deserializeAvroMetadata( + getIndexByPartitionReader().loadFileInfo().get(INDEX_INFO_KEY), + BootstrapIndexInfo.class); + } + + private HFile.Reader getIndexByPartitionReader() { + if (null == indexByPartitionReader) { + synchronized (this) { + if (null == indexByPartitionReader) { + LOG.info("Opening partition index :" + indexByPartitionPath); + this.indexByPartitionReader = + createReader(indexByPartitionPath, metaClient.getHadoopConf(), metaClient.getFs()); + } + } + } + return indexByPartitionReader; + } + + private HFile.Reader getIndexByFileIdReader() { + if (null == indexByFileIdReader) { + synchronized (this) { + if (null == indexByFileIdReader) { + LOG.info("Opening fileId index :" + indexByFileIdPath); + this.indexByFileIdReader = + createReader(indexByFileIdPath, metaClient.getHadoopConf(), metaClient.getFs()); + } + } + } + return indexByFileIdReader; + } + + public List<String> getAllBootstrapPartitionKeys() { + HFileScanner scanner = getIndexByPartitionReader().getScanner(true, true); + return getAllKeys(scanner); + } + + public List<String> getAllBootstrapFileIdKeys() { + HFileScanner scanner = getIndexByFileIdReader().getScanner(true, true); + return getAllKeys(scanner); + } + + private List<String> getAllKeys(HFileScanner scanner) { + List<String> keys = new ArrayList<>(); + if (isBootstrapped) { + try { + boolean available = scanner.seekTo(); + while (available) { + keys.add(CellUtil.getCellKeyAsString(scanner.getKeyValue())); + available = scanner.next(); + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + return keys; + } + + /** + * Drop Bootstrap Index. + */ + public void dropIndex() { + try { + Path[] indexPaths = new Path[] { new Path(indexByPartitionPath), new Path(indexByFileIdPath) }; + for (Path indexPath : indexPaths) { + if (metaClient.getFs().exists(indexPath)) { + LOG.info("Dropping bootstrap index. Deleting file : " + indexPath); + metaClient.getFs().delete(indexPath); + } + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + + /** + * Lookup bootstrap index by partition. + * @param partition Partition to lookup + * @return + */ + public List<BootstrapSourceFileMapping> getBootstrapInfoForPartition(String partition) { + if (isBootstrapped) { + try { + HFileScanner scanner = getIndexByPartitionReader().getScanner(true, true); + KeyValue keyValue = new KeyValue(Bytes.toBytes(getPartitionKey(partition)), new byte[0], new byte[0], + HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, new byte[0]); + if (scanner.seekTo(keyValue) == 0) { + ByteBuffer readValue = scanner.getValue(); + byte[] valBytes = Bytes.toBytes(readValue); + BootstrapPartitionMetadata metadata = + TimelineMetadataUtils.deserializeAvroMetadata(valBytes, BootstrapPartitionMetadata.class); + return metadata.getHudiFileIdToSourceFile().entrySet().stream() + .map(e -> new BootstrapSourceFileMapping(sourceBasePath, metadata.getSourcePartitionPath(), partition, + e.getValue(), e.getKey())).collect(Collectors.toList()); + } else { + LOG.info("No value found for partition key (" + partition + ")"); + return new ArrayList<>(); + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + return new ArrayList<>(); + } + + public String getSourceBasePath() { + return sourceBasePath; + } + + /** + * Lookup Bootstrap index by file group ids. + * @param ids File Group Ids + * @return + */ + public Map<HoodieFileGroupId, BootstrapSourceFileMapping> getBootstrapInfoForFileIds(List<HoodieFileGroupId> ids) { + Map<HoodieFileGroupId, BootstrapSourceFileMapping> result = new HashMap<>(); + // Arrange input Keys in sorted order for 1 pass scan + List<HoodieFileGroupId> fileGroupIds = new ArrayList<>(ids); + Collections.sort(fileGroupIds); + if (isBootstrapped) { + try { + HFileScanner scanner = getIndexByFileIdReader().getScanner(true, true); + for (HoodieFileGroupId fileGroupId : fileGroupIds) { + KeyValue keyValue = new KeyValue(Bytes.toBytes(getFileGroupKey(fileGroupId)), new byte[0], new byte[0], + HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, new byte[0]); + if (scanner.seekTo(keyValue) == 0) { + ByteBuffer readValue = scanner.getValue(); + byte[] valBytes = Bytes.toBytes(readValue); + BootstrapSourceFilePartitionInfo fileInfo = TimelineMetadataUtils.deserializeAvroMetadata(valBytes, + BootstrapSourceFilePartitionInfo.class); + BootstrapSourceFileMapping mapping = new BootstrapSourceFileMapping(sourceBasePath, + fileInfo.getSourcePartitionPath(), fileInfo.getHudiPartitionPath(), fileInfo.getSourceFileStatus(), + fileGroupId.getFileId()); + result.put(fileGroupId, mapping); + } + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + return result; + } + + /** + * Helper method to create HFile Reader. + * @param hFilePath File Path + * @param conf Configuration + * @param fileSystem File System + * @return + */ + private static HFile.Reader createReader(String hFilePath, Configuration conf, FileSystem fileSystem) { + try { + LOG.info("Opening HFile for reading :" + hFilePath); + CacheConfig config = new CacheConfig(conf); + HFile.Reader reader = HFile.createReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), + conf); + return reader; Review comment: nit: May be just do ``` return HFile.createReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), conf); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
