vinothchandar commented on a change in pull request #1687:
URL: https://github.com/apache/hudi/pull/1687#discussion_r434283607
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
##########
@@ -71,8 +71,9 @@ public RollbackHelper(HoodieTableMetaClient metaClient,
HoodieWriteConfig config
*/
public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc,
HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
- if (path.toString().contains(".parquet")) {
+ if (path.toString().contains(basefileExtension)) {
Review comment:
Have you tried to fish out all such occurrences in this pr. This is
great!
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -133,6 +136,12 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile dataFileTo
// Create the writer for writing the new version file
storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime,
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+
+ if (hoodieTable.requireSortedRecords()) {
Review comment:
Can we push the sorting to the spark shuffle machinery?
repartitionAndSortWithinPartitions(). It cheap and practically free
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -214,6 +223,36 @@ private boolean writeRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord>
*/
public void write(GenericRecord oldRecord) {
String key =
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+
+ if (hoodieTable.requireSortedRecords()) {
Review comment:
If we push sorting to spark, then all the iterators fed to
create/merge/append handle will sort records by records key.. for merge handle
we can do a simple merge sort style sort merge (instead of hash merge). We just
assert that both existing files and incoming records are sorted
##########
File path: hudi-common/pom.xml
##########
@@ -78,6 +92,38 @@
</imports>
</configuration>
</plugin>
+ <plugin>
Review comment:
hudi-common has Scala now? Why do we need this
##########
File path: hudi-common/pom.xml
##########
@@ -201,7 +247,26 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Spark -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
Review comment:
Why do we need this? hudi-common having spark is an anti pattern
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
##########
@@ -872,7 +873,7 @@ public void createPool(JobConf conf, PathFilter... filters)
{
job.set("hudi.hive.realtime", "true");
InputSplit[] splits;
if (hoodieFilter) {
- HoodieParquetInputFormat input = new
HoodieParquetRealtimeInputFormat();
+ HoodieParquetRealtimeInputFormat input = new
HoodieParquetRealtimeInputFormat();
Review comment:
Is this change strictly necessary for this pr
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -18,250 +18,23 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
- * Input Format, that provides a real-time view of data in a Hoodie table.
+ * HoodieRealtimeInputFormat for HUDI datasets which store data in Parquet
base file format.
*/
@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
-public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat
implements Configurable {
-
- private static final Logger LOG =
LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
-
- // These positions have to be deterministic across all tables
- public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
Review comment:
Same thing here. This code is already changing momentarily
##########
File path:
hudi-common/src/main/scala/com/databricks/spark/avro/HoodieAvroSchemaConversion.scala
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.databricks.spark.avro
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.DataType
+
+/**
+ * This helper class is required since SchemaConverters.createConverterToSQL
is currently private.
+ */
+object HoodieAvroSchemaConversion {
+ def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType):
(GenericRecord) => Row =
Review comment:
Introducing scala into hudi-common is a no-go.. there are non spark
query bundles built off this, which will all need different scala version
artifacts
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
##########
@@ -18,339 +18,14 @@
package org.apache.hudi.hadoop;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
- * HoodieInputFormat which understands the Hoodie File Structure and filters
files based on the Hoodie Mode. If paths
- * that does not correspond to a hoodie table then they are passed in as is
(as what FileInputFormat.listStatus()
- * would do). The JobConf could have paths from multipe Hoodie/Non-Hoodie
tables
+ * HoodieInputFormat for HUDI datasets which store data in Parquet base file
format.
*/
@UseFileSplitsFromInputFormat
-public class HoodieParquetInputFormat extends MapredParquetInputFormat
implements Configurable {
-
- private static final Logger LOG =
LogManager.getLogger(HoodieParquetInputFormat.class);
-
- protected Configuration conf;
-
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
- List<String> incrementalTables =
HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
- InputPathHandler inputPathHandler = new InputPathHandler(conf,
getInputPaths(job), incrementalTables);
- List<FileStatus> returns = new ArrayList<>();
-
- Map<String, HoodieTableMetaClient> tableMetaClientMap =
inputPathHandler.getTableMetaClientMap();
- // process incremental pulls first
- for (String table : incrementalTables) {
- HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
- if (metaClient == null) {
- /* This can happen when the INCREMENTAL mode is set for a table but
there were no InputPaths
- * in the jobConf
- */
- continue;
- }
- List<Path> inputPaths =
inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
- List<FileStatus> result = listStatusForIncrementalMode(job, metaClient,
inputPaths);
- if (result != null) {
- returns.addAll(result);
- }
- }
-
- // process non hoodie Paths next.
- List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
- if (nonHoodiePaths.size() > 0) {
- setInputPaths(job, nonHoodiePaths.toArray(new
Path[nonHoodiePaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- returns.addAll(Arrays.asList(fileStatuses));
- }
-
- // process snapshot queries next.
- List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
- if (snapshotPaths.size() > 0) {
- setInputPaths(job, snapshotPaths.toArray(new
Path[snapshotPaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- groupFileStatusForSnapshotPaths(fileStatuses,
tableMetaClientMap.values());
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
- for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry :
groupedFileStatus.entrySet()) {
- List<FileStatus> result =
filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
- if (result != null) {
- returns.addAll(result);
- }
- }
- }
- return returns.toArray(new FileStatus[returns.size()]);
- }
-
- /**
- * Filter any specific instants that we do not want to process.
- * example timeline:
- *
- * t0 -> create bucket1.parquet
- * t1 -> create and append updates bucket1.log
- * t2 -> request compaction
- * t3 -> create bucket2.parquet
- *
- * if compaction at t2 takes a long time, incremental readers on RO tables
can move to t3 and would skip updates in t1
- *
- * To workaround this problem, we want to stop returning data belonging to
commits > t2.
- * After compaction is complete, incremental reader would see updates in t2,
t3, so on.
- */
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline
timeline) {
- HoodieDefaultTimeline commitsAndCompactionTimeline =
timeline.getCommitsAndCompactionTimeline();
- Option<HoodieInstant> pendingCompactionInstant =
commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
- if (pendingCompactionInstant.isPresent()) {
- HoodieDefaultTimeline instantsTimeline =
commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
- int numCommitsFilteredByCompaction =
commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
- - instantsTimeline.getCommitsTimeline().countInstants();
- LOG.info("Earliest pending compaction instant is: " +
pendingCompactionInstant.get().getTimestamp()
- + " skipping " + numCommitsFilteredByCompaction + " commits");
-
- return instantsTimeline;
- } else {
- return timeline;
- }
- }
-
- /**
- * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
- * partitions and then filtering based on the commits of interest, this
logic first extracts the
- * partitions touched by the desired commits and then lists only those
partitions.
- */
- private List<FileStatus> listStatusForIncrementalMode(
- JobConf job, HoodieTableMetaClient tableMetaClient, List<Path>
inputPaths) throws IOException {
- String tableName = tableMetaClient.getTableConfig().getTableName();
- Job jobContext = Job.getInstance(job);
- HoodieDefaultTimeline baseTimeline;
- if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
- baseTimeline =
filterInstantsTimeline(tableMetaClient.getActiveTimeline());
- } else {
- baseTimeline = tableMetaClient.getActiveTimeline();
- }
-
- HoodieTimeline timeline =
baseTimeline.getCommitsTimeline().filterCompletedInstants();
- String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext,
tableName);
- // Total number of commits to return in this batch. Set this to -1 to get
all the commits.
- Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
- LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
- List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
- .getInstants().collect(Collectors.toList());
- // Extract partitions touched by the commitsToCheck
- Set<String> partitionsToList = new HashSet<>();
- for (HoodieInstant commit : commitsToCheck) {
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
- HoodieCommitMetadata.class);
-
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
- }
- if (partitionsToList.isEmpty()) {
- return null;
- }
- String incrementalInputPaths = partitionsToList.stream()
- .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
- .filter(s -> {
- /*
- * Ensure to return only results from the original input path that
has incremental changes
- * This check is needed for the following corner case - When the
caller invokes
- * HoodieInputFormat.listStatus multiple times (with small batches
of Hive partitions each
- * time. Ex. Hive fetch task calls listStatus for every partition
once) we do not want to
- * accidentally return all incremental changes for the entire table
in every listStatus()
- * call. This will create redundant splits. Instead we only want to
return the incremental
- * changes (if so any) in that batch of input paths.
- *
- * NOTE on Hive queries that are executed using Fetch task:
- * Since Fetch tasks invoke InputFormat.listStatus() per partition,
Hoodie metadata can be
- * listed in every such listStatus() call. In order to avoid this,
it might be useful to
- * disable fetch tasks using the hive session property for
incremental queries:
- * `set hive.fetch.task.conversion=none;`
- * This would ensure Map Reduce execution is chosen for a Hive
query, which combines
- * partitions (comma separated) and calls InputFormat.listStatus()
only once with all
- * those partitions.
- */
- for (Path path : inputPaths) {
- if (path.toString().contains(s)) {
- return true;
- }
- }
- return false;
- })
- .collect(Collectors.joining(","));
- if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
- return null;
- }
- // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
- setInputPaths(job, incrementalInputPaths);
- FileStatus[] fileStatuses = super.listStatus(job);
- BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient,
timeline, fileStatuses);
- List<String> commitsList =
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
- List<HoodieBaseFile> filteredFiles =
roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
- List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
- LOG.debug("Processing incremental hoodie file - " +
filteredFile.getPath());
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- LOG.info("Total paths to process after hoodie incremental filter " +
filteredFiles.size());
- return returns;
- }
-
- /**
- * Takes in a list of filesStatus and a list of table metadatas. Groups the
files status list
- * based on given table metadata.
- * @param fileStatuses
- * @param metaClientList
- * @return
- * @throws IOException
- */
- private Map<HoodieTableMetaClient, List<FileStatus>>
groupFileStatusForSnapshotPaths(
- FileStatus[] fileStatuses, Collection<HoodieTableMetaClient>
metaClientList) {
- // This assumes the paths for different tables are grouped together
- Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
- HoodieTableMetaClient metadata = null;
- for (FileStatus status : fileStatuses) {
- Path inputPath = status.getPath();
- if (!inputPath.getName().endsWith(".parquet")) {
- //FIXME(vc): skip non parquet files for now. This wont be needed once
log file name start
- // with "."
- continue;
- }
- if ((metadata == null) ||
(!inputPath.toString().contains(metadata.getBasePath()))) {
- for (HoodieTableMetaClient metaClient : metaClientList) {
- if (inputPath.toString().contains(metaClient.getBasePath())) {
- metadata = metaClient;
- if (!grouped.containsKey(metadata)) {
- grouped.put(metadata, new ArrayList<>());
- }
- break;
- }
- }
- }
- grouped.get(metadata).add(status);
- }
- return grouped;
- }
-
- /**
- * Filters data files for a snapshot queried table.
- */
- private List<FileStatus> filterFileStatusForSnapshotMode(
- HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
- FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" +
metadata);
- }
- // Get all commits, delta commits, compactions, as all of them produce a
base parquet file today
- HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata,
timeline, statuses);
- // filter files on the latest commit found
- List<HoodieBaseFile> filteredFiles =
roView.getLatestBaseFiles().collect(Collectors.toList());
- LOG.info("Total paths to process after hoodie filter " +
filteredFiles.size());
- List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
- }
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- return returns;
- }
-
- /**
- * Checks the file status for a race condition which can set the file size
to 0. 1. HiveInputFormat does
- * super.listStatus() and gets back a FileStatus[] 2. Then it creates the
HoodieTableMetaClient for the paths listed.
- * 3. Generation of splits looks at FileStatus size to create splits, which
skips this file
- */
- private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
- Path dataPath = dataFile.getFileStatus().getPath();
- try {
- if (dataFile.getFileSize() == 0) {
- FileSystem fs = dataPath.getFileSystem(conf);
- LOG.info("Refreshing file status " + dataFile.getPath());
- return new HoodieBaseFile(fs.getFileStatus(dataPath));
- }
- return dataFile;
- } catch (IOException e) {
- throw new HoodieIOException("Could not get FileStatus on path " +
dataPath);
- }
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public RecordReader<NullWritable, ArrayWritable> getRecordReader(final
InputSplit split, final JobConf job,
- final Reporter reporter) throws IOException {
- // TODO enable automatic predicate pushdown after fixing issues
- // FileSplit fileSplit = (FileSplit) split;
- // HoodieTableMetadata metadata =
getTableMetadata(fileSplit.getPath().getParent());
- // String tableName = metadata.getTableName();
- // String mode = HoodieHiveUtil.readMode(job, tableName);
-
- // if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
- // FilterPredicate predicate = constructHoodiePredicate(job, tableName,
split);
- // LOG.info("Setting parquet predicate push down as " + predicate);
- // ParquetInputFormat.setFilterPredicate(job, predicate);
- // clearOutExistingPredicate(job);
- // }
- return super.getRecordReader(split, job, reporter);
- }
-
- /**
- * Read the table metadata from a data path. This assumes certain hierarchy
of files which should be changed once a
- * better way is figured out to pass in the hoodie meta directory
- */
- protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs,
Path dataPath) throws IOException {
- int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
- if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
- HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs,
dataPath);
- metadata.readFromFS();
- levels = metadata.getPartitionDepth();
- }
- Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
- LOG.info("Reading hoodie metadata from path " + baseDir.toString());
- return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+public class HoodieParquetInputFormat extends HoodieInputFormat {
+ public HoodieParquetInputFormat() {
+ super(new MapredParquetInputFormat());
Review comment:
AFAIK hive will check in places whether the registered input format is
instanceof ParquetInputFormat.. for applying certain optimizations. It’s
probably better to not change the class hierarchy, but try to abstract by using
more modular classes
##########
File path:
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
##########
@@ -34,29 +35,29 @@
import java.io.IOException;
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
public class HoodieStorageWriterFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieStorageWriter<R> getStorageWriter(
String instantTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
- final String name = path.getName();
- final String extension = FSUtils.isLogFile(path) ?
HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
+ final HoodieTableConfig tableConfig =
hoodieTable.getMetaClient().getTableConfig();
+ final String extension = FSUtils.isLogFile(path) ?
tableConfig.getLogFileFormat().getFileExtension() :
tableConfig.getBaseFileFormat().getFileExtension();
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetStorageWriter(instantTime, path, config, schema,
hoodieTable, sparkTaskContextSupplier);
}
+ if (HFILE.getFileExtension().equals(extension)) {
+ return newHFileStorageWriter(instantTime, path, config, schema,
hoodieTable, sparkTaskContextSupplier);
Review comment:
StorageWriter is not a great name really.. let’s rename consistently to
HoodieFileWriter/HoodieFileWriterFactory?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
##########
@@ -89,11 +87,12 @@ public CommitActionExecutor(JavaSparkContext jsc,
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
} else {
- AvroReadSupport.setAvroReadSchema(table.getHadoopConf(),
upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper =
null;
- try (ParquetReader<IndexedRecord> reader =
-
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build())
{
- wrapper = new SparkBoundedInMemoryExecutor(config, new
ParquetReaderIterator(reader),
+ try {
+ HoodieStorageReader<IndexedRecord> storageReader =
+ HoodieStorageReaderFactory.getStorageReader(table.getHadoopConf(),
upsertHandle.getOldFilePath());
+ wrapper =
+ new SparkBoundedInMemoryExecutor(config,
storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
Review comment:
Assume this actually pushed the projection predicates down??
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -214,6 +223,36 @@ private boolean writeRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord>
*/
public void write(GenericRecord oldRecord) {
String key =
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+
+ if (hoodieTable.requireSortedRecords()) {
Review comment:
The advantage is more modularity as well as low memory overhead to
merge..
Note that we don’t do this for data today because we may not necessarily
want the data Sorted by key
##########
File path: hudi-common/pom.xml
##########
@@ -201,7 +247,26 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Spark -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- spark-avro -->
+ <dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
+ <version>4.0.0</version>
Review comment:
Please pull this into a variable in parent pom
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -214,6 +223,36 @@ private boolean writeRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord>
*/
public void write(GenericRecord oldRecord) {
String key =
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+
+ if (hoodieTable.requireSortedRecords()) {
+ // To maintain overall sorted order across updates and inserts, write
any new inserts whose keys are less than
+ // the oldRecord's key.
+ while (!newRecordKeysSorted.isEmpty() &&
newRecordKeysSorted.peek().compareTo(key) <= 0) {
Review comment:
I did not review this block too closely. Let’s settle on the high level
approach first
##########
File path:
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
##########
@@ -66,4 +67,21 @@
return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema,
sparkTaskContextSupplier);
}
+
+ private static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieStorageWriter<R> newHFileStorageWriter(
+ String instantTime, Path path, HoodieWriteConfig config, Schema schema,
HoodieTable hoodieTable,
+ SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+
+ BloomFilter filter = createBloomFilter(config);
Review comment:
HFilev itself contains bloom filters right.. why do we need them
outside?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import scala.Tuple2;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements
HoodieStorageReader {
+ private static final Logger LOG =
LogManager.getLogger(HoodieHFileReader.class);
+ private Path path;
+ private Configuration conf;
+ private HFile.Reader reader;
+
+ public static final String KEY_SCHEMA = "schema";
+ public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+ public static final String KEY_BLOOM_FILTER_TYPE_CODE =
"bloomFilterTypeCode";
+ public static final String KEY_MIN_RECORD = "minRecordKey";
+ public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+ public HoodieHFileReader(Configuration configuration, Path path, CacheConfig
cacheConfig) throws IOException {
+ this.conf = configuration;
+ this.path = path;
+ this.reader = HFile.createReader(FSUtils.getFs(path.toString(),
configuration), path, cacheConfig, conf);
+ }
+
+ @Override
+ public String[] readMinMaxRecordKeys() {
+ Map<byte[], byte[]> fileInfo;
+ try {
+ fileInfo = reader.loadFileInfo();
+ return new String[] { new
String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+ new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+ } catch (IOException e) {
+ throw new HoodieException("Could not read min/max record key out of file
information block correctly from path", e);
+ }
+ }
+
+ @Override
+ public Schema getSchema() {
+ try {
+ Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+ return new Schema.Parser().parse(new
String(fileInfo.get(KEY_SCHEMA.getBytes())));
+ } catch (IOException e) {
+ throw new HoodieException("Could not read schema of file from path", e);
+ }
+
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ Map<byte[], byte[]> fileInfo;
+ try {
+ fileInfo = reader.loadFileInfo();
+ ByteBuffer serializedFilter =
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+ byte[] filterBytes = new byte[serializedFilter.remaining()];
+ serializedFilter.get(filterBytes); // read the bytes that were written
+ return BloomFilterFactory.fromString(new String(filterBytes),
+ new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+ } catch (IOException e) {
+ throw new HoodieException("Could not read bloom filter from " + path, e);
+ }
+ }
+
+ @Override
+ public Set<String> filterRowKeys(Set candidateRowKeys) {
Review comment:
Use of raw type
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import scala.Tuple2;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements
HoodieStorageReader {
Review comment:
Let’s rename HoodieStorageReader to HoodieFileReader (also the factory)
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * HoodieInputFormat for HUDI datasets which store data in HFile base file
format.
Review comment:
Why do we need this?
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
##########
@@ -872,7 +873,7 @@ public void createPool(JobConf conf, PathFilter... filters)
{
job.set("hudi.hive.realtime", "true");
InputSplit[] splits;
if (hoodieFilter) {
- HoodieParquetInputFormat input = new
HoodieParquetRealtimeInputFormat();
+ HoodieParquetRealtimeInputFormat input = new
HoodieParquetRealtimeInputFormat();
Review comment:
This is a behavior change.. ccc @n3nash to confirm if this is ok
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
##########
@@ -18,339 +18,14 @@
package org.apache.hudi.hadoop;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
- * HoodieInputFormat which understands the Hoodie File Structure and filters
files based on the Hoodie Mode. If paths
- * that does not correspond to a hoodie table then they are passed in as is
(as what FileInputFormat.listStatus()
- * would do). The JobConf could have paths from multipe Hoodie/Non-Hoodie
tables
+ * HoodieInputFormat for HUDI datasets which store data in Parquet base file
format.
*/
@UseFileSplitsFromInputFormat
-public class HoodieParquetInputFormat extends MapredParquetInputFormat
implements Configurable {
-
- private static final Logger LOG =
LogManager.getLogger(HoodieParquetInputFormat.class);
-
- protected Configuration conf;
-
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
- List<String> incrementalTables =
HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
- InputPathHandler inputPathHandler = new InputPathHandler(conf,
getInputPaths(job), incrementalTables);
- List<FileStatus> returns = new ArrayList<>();
-
- Map<String, HoodieTableMetaClient> tableMetaClientMap =
inputPathHandler.getTableMetaClientMap();
- // process incremental pulls first
- for (String table : incrementalTables) {
- HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
- if (metaClient == null) {
- /* This can happen when the INCREMENTAL mode is set for a table but
there were no InputPaths
- * in the jobConf
- */
- continue;
- }
- List<Path> inputPaths =
inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
- List<FileStatus> result = listStatusForIncrementalMode(job, metaClient,
inputPaths);
- if (result != null) {
- returns.addAll(result);
- }
- }
-
- // process non hoodie Paths next.
- List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
- if (nonHoodiePaths.size() > 0) {
- setInputPaths(job, nonHoodiePaths.toArray(new
Path[nonHoodiePaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- returns.addAll(Arrays.asList(fileStatuses));
- }
-
- // process snapshot queries next.
- List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
- if (snapshotPaths.size() > 0) {
- setInputPaths(job, snapshotPaths.toArray(new
Path[snapshotPaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- groupFileStatusForSnapshotPaths(fileStatuses,
tableMetaClientMap.values());
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
- for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry :
groupedFileStatus.entrySet()) {
- List<FileStatus> result =
filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
- if (result != null) {
- returns.addAll(result);
- }
- }
- }
- return returns.toArray(new FileStatus[returns.size()]);
- }
-
- /**
- * Filter any specific instants that we do not want to process.
- * example timeline:
- *
- * t0 -> create bucket1.parquet
- * t1 -> create and append updates bucket1.log
- * t2 -> request compaction
- * t3 -> create bucket2.parquet
- *
- * if compaction at t2 takes a long time, incremental readers on RO tables
can move to t3 and would skip updates in t1
- *
- * To workaround this problem, we want to stop returning data belonging to
commits > t2.
- * After compaction is complete, incremental reader would see updates in t2,
t3, so on.
- */
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline
timeline) {
- HoodieDefaultTimeline commitsAndCompactionTimeline =
timeline.getCommitsAndCompactionTimeline();
- Option<HoodieInstant> pendingCompactionInstant =
commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
- if (pendingCompactionInstant.isPresent()) {
- HoodieDefaultTimeline instantsTimeline =
commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
- int numCommitsFilteredByCompaction =
commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
- - instantsTimeline.getCommitsTimeline().countInstants();
- LOG.info("Earliest pending compaction instant is: " +
pendingCompactionInstant.get().getTimestamp()
- + " skipping " + numCommitsFilteredByCompaction + " commits");
-
- return instantsTimeline;
- } else {
- return timeline;
- }
- }
-
- /**
- * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
- * partitions and then filtering based on the commits of interest, this
logic first extracts the
- * partitions touched by the desired commits and then lists only those
partitions.
- */
- private List<FileStatus> listStatusForIncrementalMode(
- JobConf job, HoodieTableMetaClient tableMetaClient, List<Path>
inputPaths) throws IOException {
- String tableName = tableMetaClient.getTableConfig().getTableName();
- Job jobContext = Job.getInstance(job);
- HoodieDefaultTimeline baseTimeline;
- if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
- baseTimeline =
filterInstantsTimeline(tableMetaClient.getActiveTimeline());
- } else {
- baseTimeline = tableMetaClient.getActiveTimeline();
- }
-
- HoodieTimeline timeline =
baseTimeline.getCommitsTimeline().filterCompletedInstants();
- String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext,
tableName);
- // Total number of commits to return in this batch. Set this to -1 to get
all the commits.
- Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
- LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
- List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
- .getInstants().collect(Collectors.toList());
- // Extract partitions touched by the commitsToCheck
- Set<String> partitionsToList = new HashSet<>();
- for (HoodieInstant commit : commitsToCheck) {
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
- HoodieCommitMetadata.class);
-
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
- }
- if (partitionsToList.isEmpty()) {
- return null;
- }
- String incrementalInputPaths = partitionsToList.stream()
- .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
- .filter(s -> {
- /*
- * Ensure to return only results from the original input path that
has incremental changes
- * This check is needed for the following corner case - When the
caller invokes
- * HoodieInputFormat.listStatus multiple times (with small batches
of Hive partitions each
- * time. Ex. Hive fetch task calls listStatus for every partition
once) we do not want to
- * accidentally return all incremental changes for the entire table
in every listStatus()
- * call. This will create redundant splits. Instead we only want to
return the incremental
- * changes (if so any) in that batch of input paths.
- *
- * NOTE on Hive queries that are executed using Fetch task:
- * Since Fetch tasks invoke InputFormat.listStatus() per partition,
Hoodie metadata can be
- * listed in every such listStatus() call. In order to avoid this,
it might be useful to
- * disable fetch tasks using the hive session property for
incremental queries:
- * `set hive.fetch.task.conversion=none;`
- * This would ensure Map Reduce execution is chosen for a Hive
query, which combines
- * partitions (comma separated) and calls InputFormat.listStatus()
only once with all
- * those partitions.
- */
- for (Path path : inputPaths) {
- if (path.toString().contains(s)) {
- return true;
- }
- }
- return false;
- })
- .collect(Collectors.joining(","));
- if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
- return null;
- }
- // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
- setInputPaths(job, incrementalInputPaths);
- FileStatus[] fileStatuses = super.listStatus(job);
- BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient,
timeline, fileStatuses);
- List<String> commitsList =
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
- List<HoodieBaseFile> filteredFiles =
roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
- List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
- LOG.debug("Processing incremental hoodie file - " +
filteredFile.getPath());
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- LOG.info("Total paths to process after hoodie incremental filter " +
filteredFiles.size());
- return returns;
- }
-
- /**
- * Takes in a list of filesStatus and a list of table metadatas. Groups the
files status list
- * based on given table metadata.
- * @param fileStatuses
- * @param metaClientList
- * @return
- * @throws IOException
- */
- private Map<HoodieTableMetaClient, List<FileStatus>>
groupFileStatusForSnapshotPaths(
- FileStatus[] fileStatuses, Collection<HoodieTableMetaClient>
metaClientList) {
- // This assumes the paths for different tables are grouped together
- Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
- HoodieTableMetaClient metadata = null;
- for (FileStatus status : fileStatuses) {
- Path inputPath = status.getPath();
- if (!inputPath.getName().endsWith(".parquet")) {
- //FIXME(vc): skip non parquet files for now. This wont be needed once
log file name start
- // with "."
- continue;
- }
- if ((metadata == null) ||
(!inputPath.toString().contains(metadata.getBasePath()))) {
- for (HoodieTableMetaClient metaClient : metaClientList) {
- if (inputPath.toString().contains(metaClient.getBasePath())) {
- metadata = metaClient;
- if (!grouped.containsKey(metadata)) {
- grouped.put(metadata, new ArrayList<>());
- }
- break;
- }
- }
- }
- grouped.get(metadata).add(status);
- }
- return grouped;
- }
-
- /**
- * Filters data files for a snapshot queried table.
- */
- private List<FileStatus> filterFileStatusForSnapshotMode(
- HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
- FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" +
metadata);
- }
- // Get all commits, delta commits, compactions, as all of them produce a
base parquet file today
- HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata,
timeline, statuses);
- // filter files on the latest commit found
- List<HoodieBaseFile> filteredFiles =
roView.getLatestBaseFiles().collect(Collectors.toList());
- LOG.info("Total paths to process after hoodie filter " +
filteredFiles.size());
- List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
- }
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- return returns;
- }
-
- /**
- * Checks the file status for a race condition which can set the file size
to 0. 1. HiveInputFormat does
- * super.listStatus() and gets back a FileStatus[] 2. Then it creates the
HoodieTableMetaClient for the paths listed.
- * 3. Generation of splits looks at FileStatus size to create splits, which
skips this file
- */
- private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
- Path dataPath = dataFile.getFileStatus().getPath();
- try {
- if (dataFile.getFileSize() == 0) {
- FileSystem fs = dataPath.getFileSystem(conf);
- LOG.info("Refreshing file status " + dataFile.getPath());
- return new HoodieBaseFile(fs.getFileStatus(dataPath));
- }
- return dataFile;
- } catch (IOException e) {
- throw new HoodieIOException("Could not get FileStatus on path " +
dataPath);
- }
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public RecordReader<NullWritable, ArrayWritable> getRecordReader(final
InputSplit split, final JobConf job,
- final Reporter reporter) throws IOException {
- // TODO enable automatic predicate pushdown after fixing issues
- // FileSplit fileSplit = (FileSplit) split;
- // HoodieTableMetadata metadata =
getTableMetadata(fileSplit.getPath().getParent());
- // String tableName = metadata.getTableName();
- // String mode = HoodieHiveUtil.readMode(job, tableName);
-
- // if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
- // FilterPredicate predicate = constructHoodiePredicate(job, tableName,
split);
- // LOG.info("Setting parquet predicate push down as " + predicate);
- // ParquetInputFormat.setFilterPredicate(job, predicate);
- // clearOutExistingPredicate(job);
- // }
- return super.getRecordReader(split, job, reporter);
- }
-
- /**
- * Read the table metadata from a data path. This assumes certain hierarchy
of files which should be changed once a
- * better way is figured out to pass in the hoodie meta directory
- */
- protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs,
Path dataPath) throws IOException {
- int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
- if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
- HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs,
dataPath);
- metadata.readFromFS();
- levels = metadata.getPartitionDepth();
- }
- Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
- LOG.info("Reading hoodie metadata from path " + baseDir.toString());
- return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+public class HoodieParquetInputFormat extends HoodieInputFormat {
Review comment:
This code is being actively refactored by @garyli1019 and @bhasudha as
well. Gary’s pr will land soon I believe..
##########
File path: style/checkstyle-suppressions.xml
##########
@@ -26,4 +26,5 @@
<!-- Member Names expected to start with "_" -->
<suppress checks="naming" files="TestRecord.java" lines="1-9999"/>
<suppress checks="IllegalImport" files="Option.java" />
+ <suppress checks="naming" files="HoodieInputFormat.java" lines="73"/>
Review comment:
Why does checkstyle complain
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]