This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0abc00df841 [HUDI-6787] Implement the HoodieFileGroupReader API for
Hive (#10422)
0abc00df841 is described below
commit 0abc00df8412c5ea3d15ab50d5074d8e8bccebcb
Author: Jon Vexler <[email protected]>
AuthorDate: Sat Jun 8 22:22:46 2024 -0400
[HUDI-6787] Implement the HoodieFileGroupReader API for Hive (#10422)
---
.../hudi/client/TestPartitionTTLManagement.java | 2 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 2 +-
.../TestHoodieSparkMergeOnReadTableCompaction.java | 80 +++---
.../hudi/common/engine/HoodieReaderContext.java | 29 ++-
.../org/apache/hudi/common/model/HoodieRecord.java | 2 +-
.../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 15 +-
.../hudi/hadoop/HiveHoodieReaderContext.java | 273 ++++++++++++++++++++
.../HoodieFileGroupReaderBasedRecordReader.java | 281 +++++++++++++++++++++
.../org/apache/hudi/hadoop/HoodieHiveRecord.java | 221 ++++++++++++++++
.../apache/hudi/hadoop/HoodieHiveRecordMerger.java | 71 ++++++
.../hudi/hadoop/HoodieParquetInputFormat.java | 48 +++-
.../hudi/hadoop/RecordReaderValueIterator.java | 13 +-
.../HoodieCombineRealtimeRecordReader.java | 51 +++-
.../realtime/HoodieParquetRealtimeInputFormat.java | 15 +-
.../hadoop/utils/HoodieArrayWritableAvroUtils.java | 110 ++++++++
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 36 +++
.../hudi/hadoop/utils/ObjectInspectorCache.java | 103 ++++++++
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 122 ++++-----
.../hive/TestHoodieCombineHiveInputFormat.java | 14 +-
.../TestHoodieMergeOnReadSnapshotReader.java | 2 +
.../realtime/TestHoodieRealtimeRecordReader.java | 2 +
.../utils/TestHoodieArrayWritableAvroUtils.java | 88 +++++++
.../org/apache/hudi/functional/TestBootstrap.java | 1 +
.../functional/TestHiveTableSchemaEvolution.java | 2 +
.../TestSparkConsistentBucketClustering.java | 2 +-
.../streamer/TestHoodieStreamerUtils.java | 13 +-
26 files changed, 1470 insertions(+), 128 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
index cda76154ca6..f4e9d206f06 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
@@ -182,7 +182,7 @@ public class TestPartitionTTLManagement extends
HoodieClientTestBase {
private List<GenericRecord> readRecords(String[] partitions) {
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf,
Arrays.stream(partitions).map(p -> Paths.get(basePath,
p).toString()).collect(Collectors.toList()),
- basePath, new JobConf(storageConf.unwrap()), true, false);
+ basePath, new JobConf(storageConf.unwrap()), true, true);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index b0876d06103..ae81a310190 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -213,7 +213,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf(), inputPaths,
- basePath(), new JobConf(storageConf().unwrap()), true, false);
+ basePath(), new JobConf(storageConf().unwrap()), true,
populateMetaFields);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index e2ba56f94a3..ef28980d9cf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
@@ -146,43 +147,50 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable,
boolean enableTimelineServer) throws IOException {
- Properties props = getPropertiesForKeyGen(true);
- HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
- .forTable("test-trip-table")
- .withPath(basePath())
- .withSchema(TRIP_EXAMPLE_SCHEMA)
- .withParallelism(2, 2)
- .withAutoCommit(true)
- .withEmbeddedTimelineServerEnabled(enableTimelineServer)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(1).build())
- .withLayoutConfig(HoodieLayoutConfig.newBuilder()
- .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
- .build();
- props.putAll(config.getProps());
-
- metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
- client = getHoodieWriteClient(config);
-
- final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
- JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+ try {
+ //disable for this test because it seems like we process mor in a
different order?
+
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .forTable("test-trip-table")
+ .withPath(basePath())
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withAutoCommit(true)
+ .withEmbeddedTimelineServerEnabled(enableTimelineServer)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+ .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+ .build();
+ props.putAll(config.getProps());
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
+ client = getHoodieWriteClient(config);
+
+ final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+ // initialize 100 records
+ client.upsert(writeRecords, client.startCommit());
+ // update 100 records
+ client.upsert(writeRecords, client.startCommit());
+ // schedule compaction
+ client.scheduleCompaction(Option.empty());
+ // delete 50 records
+ List<HoodieKey> toBeDeleted =
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
+ JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
+ client.delete(deleteRecords, client.startCommit());
+ // insert the same 100 records again
+ client.upsert(writeRecords, client.startCommit());
+ Assertions.assertEquals(100, readTableTotalRecordsNum());
+ } finally {
+
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"true");
+ }
- // initialize 100 records
- client.upsert(writeRecords, client.startCommit());
- // update 100 records
- client.upsert(writeRecords, client.startCommit());
- // schedule compaction
- client.scheduleCompaction(Option.empty());
- // delete 50 records
- List<HoodieKey> toBeDeleted =
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
- JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
- client.delete(deleteRecords, client.startCommit());
- // insert the same 100 records again
- client.upsert(writeRecords, client.startCommit());
- Assertions.assertEquals(100, readTableTotalRecordsNum());
}
private long readTableTotalRecordsNum() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 218e0eb4b03..b79562f8b43 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
@@ -38,6 +39,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
/**
* An abstract reader context class for {@code HoodieFileGroupReader} to use,
containing APIs for
* engine-specific implementation on reading data files, getting field values
from a record,
@@ -197,7 +200,10 @@ public abstract class HoodieReaderContext<T> {
* @param schema The Avro schema of the record.
* @return The record key in String.
*/
- public abstract String getRecordKey(T record, Schema schema);
+ public String getRecordKey(T record, Schema schema) {
+ Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD);
+ return val.toString();
+ }
/**
* Gets the ordering value in particular type.
@@ -208,10 +214,23 @@ public abstract class HoodieReaderContext<T> {
* @param props Properties.
* @return The ordering value.
*/
- public abstract Comparable getOrderingValue(Option<T> recordOption,
- Map<String, Object> metadataMap,
- Schema schema,
- TypedProperties props);
+ public Comparable getOrderingValue(Option<T> recordOption,
+ Map<String, Object> metadataMap,
+ Schema schema,
+ TypedProperties props) {
+ if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+ return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
+ }
+
+ if (!recordOption.isPresent()) {
+ return 0;
+ }
+
+ String orderingFieldName = ConfigUtils.getOrderingField(props);
+ Object value = getValue(recordOption.get(), schema, orderingFieldName);
+ return value != null ? (Comparable) value : 0;
+
+ }
/**
* Constructs a new {@link HoodieRecord} based on the record of
engine-specific type and metadata for merging.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 76e640d927a..442419d0713 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -476,6 +476,6 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
}
public enum HoodieRecordType {
- AVRO, SPARK
+ AVRO, SPARK, HIVE
}
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index 9b087482c72..e8e92f6b420 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -61,6 +61,8 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;
+
/**
* Utility functions related to accessing the file storage on Hadoop.
*/
@@ -377,13 +379,24 @@ public class HadoopFSUtils {
* the file name.
*/
public static String getFileIdFromLogPath(Path path) {
- Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName());
+ Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.find()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
}
return matcher.group(1);
}
+ /**
+ * Get the second part of the file name in the log file. That will be the
delta commit time.
+ */
+ public static String getDeltaCommitTimeFromLogPath(Path path) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+ if (!matcher.find()) {
+ throw new InvalidHoodiePathException(path.toString(), "LogFile");
+ }
+ return matcher.group(2);
+ }
+
/**
* Check if the file is a base file of a log file. Then get the fileId
appropriately.
*/
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
new file mode 100644
index 00000000000..904d4882cc9
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getPartitionFieldNames;
+
+/**
+ * {@link HoodieReaderContext} for Hive-specific {@link
HoodieFileGroupReaderBasedRecordReader}.
+ */
+public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable> {
+ protected final HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator
readerCreator;
+ protected final InputSplit split;
+ protected final JobConf jobConf;
+ protected final Reporter reporter;
+ protected final Schema writerSchema;
+ protected Map<String, String[]> hosts;
+ protected final Map<String, TypeInfo> columnTypeMap;
+ private final ObjectInspectorCache objectInspectorCache;
+ private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
+
+ private final List<String> partitionCols;
+ private final Set<String> partitionColSet;
+
+ private final String recordKeyField;
+
+ protected
HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator
readerCreator,
+ InputSplit split,
+ JobConf jobConf,
+ Reporter reporter,
+ Schema writerSchema,
+ Map<String, String[]> hosts,
+ HoodieTableMetaClient metaClient) {
+ this.readerCreator = readerCreator;
+ this.split = split;
+ this.jobConf = jobConf;
+ this.reporter = reporter;
+ this.writerSchema = writerSchema;
+ this.hosts = hosts;
+ this.partitionCols = getPartitionFieldNames(jobConf).stream().filter(n ->
writerSchema.getField(n) != null).collect(Collectors.toList());
+ this.partitionColSet = new HashSet<>(this.partitionCols);
+ String tableName = metaClient.getTableConfig().getTableName();
+ recordKeyField = getRecordKeyField(metaClient);
+ this.objectInspectorCache =
HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf);
+ this.columnTypeMap = objectInspectorCache.getColumnTypeMap();
+ }
+
+ /**
+ * If populate meta fields is false, then getRecordKeyFields()
+ * should return exactly 1 recordkey field.
+ */
+ private static String getRecordKeyField(HoodieTableMetaClient metaClient) {
+ if (metaClient.getTableConfig().populateMetaFields()) {
+ return HoodieRecord.RECORD_KEY_METADATA_FIELD;
+ }
+
+ Option<String[]> recordKeyFieldsOpt =
metaClient.getTableConfig().getRecordKeyFields();
+ ValidationUtils.checkArgument(recordKeyFieldsOpt.isPresent(), "No record
key field set in table config, but populateMetaFields is disabled");
+ ValidationUtils.checkArgument(recordKeyFieldsOpt.get().length == 1, "More
than 1 record key set in table config, but populateMetaFields is disabled");
+ return recordKeyFieldsOpt.get()[0];
+ }
+
+ private void setSchemas(JobConf jobConf, Schema dataSchema, Schema
requiredSchema) {
+ List<String> dataColumnNameList = dataSchema.getFields().stream().map(f ->
f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList());
+ List<TypeInfo> dataColumnTypeList =
dataColumnNameList.stream().map(fieldName -> {
+ TypeInfo type = columnTypeMap.get(fieldName);
+ if (type == null) {
+ throw new IllegalArgumentException("Field: " + fieldName + ", does not
have a defined type");
+ }
+ return type;
+ }).collect(Collectors.toList());
+ jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",",
dataColumnNameList));
+ jobConf.set(serdeConstants.LIST_COLUMN_TYPES,
dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(",")));
+ // don't replace `f -> f.name()` with lambda reference
+ String readColNames = requiredSchema.getFields().stream().map(f ->
f.name()).collect(Collectors.joining(","));
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNames);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
requiredSchema.getFields()
+ .stream().map(f ->
String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(",")));
+ }
+
+ @Override
+ public HoodieStorage getStorage(String path, StorageConfiguration<?> conf) {
+ return HoodieStorageUtils.getStorage(path, conf);
+ }
+
+ @Override
+ public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, long start, long length, Schema dataSchema, Schema requiredSchema,
HoodieStorage storage) throws IOException {
+ JobConf jobConfCopy = new JobConf(jobConf);
+ //move the partition cols to the end, because in some cases it has issues
if we don't do that
+ Schema modifiedDataSchema =
HoodieAvroUtils.generateProjectionSchema(dataSchema,
Stream.concat(dataSchema.getFields().stream()
+ .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n ->
!partitionColSet.contains(n)),
+ partitionCols.stream().filter(c -> dataSchema.getField(c) !=
null)).collect(Collectors.toList()));
+ setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema);
+ InputSplit inputSplit = new FileSplit(new Path(filePath.toString()),
start, length, hosts.get(filePath.toString()));
+ RecordReader<NullWritable, ArrayWritable> recordReader =
readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter);
+ if (firstRecordReader == null) {
+ firstRecordReader = recordReader;
+ }
+ ClosableIterator<ArrayWritable> recordIterator = new
RecordReaderValueIterator<>(recordReader);
+ if (modifiedDataSchema.equals(requiredSchema)) {
+ return recordIterator;
+ }
+ // record reader puts the required columns in the positions of the data
schema and nulls the rest of the columns
+ return new CloseableMappingIterator<>(recordIterator,
projectRecord(modifiedDataSchema, requiredSchema));
+ }
+
+ @Override
+ public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) {
+ return (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord,
avroRecord.getSchema(), true);
+ }
+
+ @Override
+ public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+ if (mergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
+ return new HoodieHiveRecordMerger();
+ }
+ throw new HoodieException(String.format("The merger strategy UUID is not
supported, Default: %s, Passed: %s", mergerStrategy,
DEFAULT_MERGER_STRATEGY_UUID));
+ }
+
+ @Override
+ public String getRecordKey(ArrayWritable record, Schema schema) {
+ return getValue(record, schema, recordKeyField).toString();
+ }
+
+ @Override
+ public Object getValue(ArrayWritable record, Schema schema, String
fieldName) {
+ return StringUtils.isNullOrEmpty(fieldName) ? null :
objectInspectorCache.getValue(record, schema, fieldName);
+ }
+
+ @Override
+ public HoodieRecord<ArrayWritable>
constructHoodieRecord(Option<ArrayWritable> recordOption, Map<String, Object>
metadataMap) {
+ if (!recordOption.isPresent()) {
+ return new HoodieEmptyRecord<>(new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH)),
HoodieRecord.HoodieRecordType.HIVE);
+ }
+ Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
+ ArrayWritable writable = recordOption.get();
+ return new HoodieHiveRecord(new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema,
objectInspectorCache);
+ }
+
+ @Override
+ public ArrayWritable seal(ArrayWritable record) {
+ return new ArrayWritable(Writable.class, Arrays.copyOf(record.get(),
record.get().length));
+ }
+
+ @Override
+ public ClosableIterator<ArrayWritable>
mergeBootstrapReaders(ClosableIterator<ArrayWritable> skeletonFileIterator,
+ Schema
skeletonRequiredSchema,
+
ClosableIterator<ArrayWritable> dataFileIterator,
+ Schema
dataRequiredSchema) {
+ int skeletonLen = skeletonRequiredSchema.getFields().size();
+ int dataLen = dataRequiredSchema.getFields().size();
+ return new ClosableIterator<ArrayWritable>() {
+
+ private final ArrayWritable returnWritable = new
ArrayWritable(Writable.class);
+
+ @Override
+ public boolean hasNext() {
+ if (dataFileIterator.hasNext() != skeletonFileIterator.hasNext()) {
+ throw new IllegalStateException("bootstrap data file iterator and
skeleton file iterator are out of sync");
+ }
+ return dataFileIterator.hasNext();
+ }
+
+ @Override
+ public ArrayWritable next() {
+ Writable[] skeletonWritable = skeletonFileIterator.next().get();
+ Writable[] dataWritable = dataFileIterator.next().get();
+ Writable[] mergedWritable = new Writable[skeletonLen + dataLen];
+ System.arraycopy(skeletonWritable, 0, mergedWritable, 0, skeletonLen);
+ System.arraycopy(dataWritable, 0, mergedWritable, skeletonLen,
dataLen);
+ returnWritable.set(mergedWritable);
+ return returnWritable;
+ }
+
+ @Override
+ public void close() {
+ skeletonFileIterator.close();
+ dataFileIterator.close();
+ }
+ };
+ }
+
+ @Override
+ public UnaryOperator<ArrayWritable> projectRecord(Schema from, Schema to,
Map<String, String> renamedColumns) {
+ if (!renamedColumns.isEmpty()) {
+ throw new IllegalStateException("Schema evolution is not supported in
the filegroup reader for Hive currently");
+ }
+ return HoodieArrayWritableAvroUtils.projectRecord(from, to);
+ }
+
+ public UnaryOperator<ArrayWritable> reverseProjectRecord(Schema from, Schema
to) {
+ return HoodieArrayWritableAvroUtils.reverseProject(from, to);
+ }
+
+ public long getPos() throws IOException {
+ if (firstRecordReader != null) {
+ return firstRecordReader.getPos();
+ }
+ throw new IllegalStateException("getPos() should not be called before a
record reader has been initialized");
+ }
+
+ public float getProgress() throws IOException {
+ if (firstRecordReader != null) {
+ return firstRecordReader.getProgress();
+ }
+ throw new IllegalStateException("getProgress() should not be called before
a record reader has been initialized");
+ }
+
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
new file mode 100644
index 00000000000..efbf68c8e0f
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static org.apache.hudi.common.fs.FSUtils.getCommitTime;
+import static org.apache.hudi.common.fs.FSUtils.getFileId;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo;
+import static
org.apache.hudi.hadoop.fs.HadoopFSUtils.getDeltaCommitTimeFromLogPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFileIdFromLogPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.isLogFile;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getPartitionFieldNames;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableBasePath;
+
+/**
+ * {@link HoodieFileGroupReader} based implementation of Hive's {@link
RecordReader} for {@link ArrayWritable}.
+ */
+public class HoodieFileGroupReaderBasedRecordReader implements
RecordReader<NullWritable, ArrayWritable> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieFileGroupReaderBasedRecordReader.class);
+
+ public interface HiveReaderCreator {
+ org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable>
getRecordReader(
+ final org.apache.hadoop.mapred.InputSplit split,
+ final org.apache.hadoop.mapred.JobConf job,
+ final org.apache.hadoop.mapred.Reporter reporter
+ ) throws IOException;
+ }
+
+ private final HiveHoodieReaderContext readerContext;
+ private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+ private final ArrayWritable arrayWritable;
+ private final NullWritable nullWritable = NullWritable.get();
+ private final InputSplit inputSplit;
+ private final JobConf jobConfCopy;
+ private final UnaryOperator<ArrayWritable> reverseProjection;
+
+ public HoodieFileGroupReaderBasedRecordReader(HiveReaderCreator
readerCreator,
+ final InputSplit split,
+ final JobConf jobConf,
+ final Reporter reporter)
throws IOException {
+ this.jobConfCopy = new JobConf(jobConf);
+ HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+ Set<String> partitionColumns = new
HashSet<>(getPartitionFieldNames(jobConfCopy));
+ this.inputSplit = split;
+
+ FileSplit fileSplit = (FileSplit) split;
+ String tableBasePath = getTableBasePath(split, jobConfCopy);
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(getStorageConf(jobConfCopy))
+ .setBasePath(tableBasePath)
+ .build();
+ String latestCommitTime = getLatestCommitTime(split, metaClient);
+ Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy,
latestCommitTime);
+ Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+ Map<String, String[]> hosts = new HashMap<>();
+ this.readerContext = new HiveHoodieReaderContext(readerCreator, split,
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+ this.arrayWritable = new ArrayWritable(Writable.class, new
Writable[requestedSchema.getFields().size()]);
+ // get some config values
+ long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(),
MAX_MEMORY_FOR_MERGE.defaultValue());
+ String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
FileIOUtils.getDefaultSpillableMapBasePath());
+ ExternalSpillableMap.DiskMapType spillMapType =
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
+
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+ boolean bitmaskCompressEnabled =
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+ DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+ LOG.debug("Creating HoodieFileGroupReaderRecordReader with
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath,
latestCommitTime, fileSplit.getPath());
+ this.fileGroupReader = new HoodieFileGroupReader<>(readerContext,
metaClient.getStorage(), tableBasePath,
+ latestCommitTime, getFileSliceFromSplit(fileSplit, hosts,
getFs(tableBasePath, jobConfCopy), tableBasePath),
+ tableSchema, requestedSchema, Option.empty(), metaClient,
metaClient.getTableConfig().getProps(), metaClient.getTableConfig(),
fileSplit.getStart(),
+ fileSplit.getLength(), false, maxMemoryForMerge, spillableMapPath,
spillMapType, bitmaskCompressEnabled);
+ this.fileGroupReader.initRecordIterators();
+ // it expects the partition columns to be at the end
+ Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
+ Stream.concat(tableSchema.getFields().stream().map(f ->
f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)),
+ partitionColumns.stream()).collect(Collectors.toList()));
+ this.reverseProjection =
readerContext.reverseProjectRecord(requestedSchema, outputSchema);
+ }
+
+ @Override
+ public boolean next(NullWritable key, ArrayWritable value) throws
IOException {
+ if (!fileGroupReader.hasNext()) {
+ return false;
+ }
+ value.set(fileGroupReader.next().get());
+ reverseProjection.apply(value);
+ return true;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return nullWritable;
+ }
+
+ @Override
+ public ArrayWritable createValue() {
+ return arrayWritable;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return readerContext.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ fileGroupReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return readerContext.getProgress();
+ }
+
+ public RealtimeSplit getSplit() {
+ return (RealtimeSplit) inputSplit;
+ }
+
+ public JobConf getJobConf() {
+ return jobConfCopy;
+ }
+
+ private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient,
JobConf jobConf, String latestCommitTime) {
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ try {
+ Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime);
+ // Add partitioning fields to writer schema for resulting row to contain
null values for these fields
+ return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema,
getPartitionFieldNames(jobConf));
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to get table schema", e);
+ }
+ }
+
+ private static String getLatestCommitTime(InputSplit split,
HoodieTableMetaClient metaClient) {
+ if (split instanceof RealtimeSplit) {
+ return ((RealtimeSplit) split).getMaxCommitTime();
+ }
+ Option<HoodieInstant> lastInstant =
metaClient.getCommitsTimeline().lastInstant();
+ if (lastInstant.isPresent()) {
+ return lastInstant.get().getTimestamp();
+ } else {
+ return EMPTY_STRING;
+ }
+ }
+
+ /**
+ * Convert FileSplit to FileSlice, but save the locations in 'hosts' because
that data is otherwise lost.
+ */
+ private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String,
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
+ BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs);
+ if (split instanceof RealtimeSplit) {
+ // MOR
+ RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+ boolean isLogFile = isLogFile(realtimeSplit.getPath());
+ String fileID;
+ String commitTime;
+ if (isLogFile) {
+ fileID = getFileIdFromLogPath(realtimeSplit.getPath());
+ commitTime = getDeltaCommitTimeFromLogPath(realtimeSplit.getPath());
+ } else {
+ fileID = getFileId(realtimeSplit.getPath().getName());
+ commitTime = getCommitTime(realtimeSplit.getPath().toString());
+ }
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(getRelativePartitionPath(new
Path(realtimeSplit.getBasePath()), realtimeSplit.getPath()), fileID);
+ if (isLogFile) {
+ return new FileSlice(fileGroupId, commitTime, null,
realtimeSplit.getDeltaLogFiles());
+ }
+ hosts.put(realtimeSplit.getPath().toString(),
realtimeSplit.getLocations());
+ HoodieBaseFile hoodieBaseFile = new
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath())),
bootstrapBaseFile);
+ return new FileSlice(fileGroupId, commitTime, hoodieBaseFile,
realtimeSplit.getDeltaLogFiles());
+ }
+ // COW
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(getFileId(split.getPath().getName()),
getRelativePartitionPath(new Path(tableBasePath), split.getPath()));
+ hosts.put(split.getPath().toString(), split.getLocations());
+ return new FileSlice(
+ fileGroupId,
+ getCommitTime(split.getPath().toString()),
+ new
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath())),
bootstrapBaseFile),
+ Collections.emptyList());
+ }
+
+ private static BaseFile createBootstrapBaseFile(FileSplit split, Map<String,
String[]> hosts, FileSystem fs) throws IOException {
+ if (split instanceof BootstrapBaseFileSplit) {
+ BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit)
split;
+ FileSplit bootstrapFileSplit =
bootstrapBaseFileSplit.getBootstrapFileSplit();
+ hosts.put(bootstrapFileSplit.getPath().toString(),
bootstrapFileSplit.getLocations());
+ return new
BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath())));
+ }
+ return null;
+ }
+
+ private static Schema createRequestedSchema(Schema tableSchema, JobConf
jobConf) {
+ String readCols =
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ if (StringUtils.isNullOrEmpty(readCols)) {
+ Schema emptySchema = Schema.createRecord(tableSchema.getName(),
tableSchema.getDoc(),
+ tableSchema.getNamespace(), tableSchema.isError());
+ emptySchema.setFields(Collections.emptyList());
+ return emptySchema;
+ }
+ // hive will handle the partition cols
+ String partitionColString =
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+ Set<String> partitionColumns;
+ if (partitionColString == null) {
+ partitionColumns = Collections.emptySet();
+ } else {
+ partitionColumns =
Arrays.stream(partitionColString.split(",")).collect(Collectors.toSet());
+ }
+ // if they are actually written to the file, then it is ok to read them
from the file
+ tableSchema.getFields().forEach(f ->
partitionColumns.remove(f.name().toLowerCase(Locale.ROOT)));
+ return HoodieAvroUtils.generateProjectionSchema(tableSchema,
+
Arrays.stream(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",")).filter(c
-> !partitionColumns.contains(c)).collect(Collectors.toList()));
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
new file mode 100644
index 00000000000..a2fb08fd614
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * {@link HoodieRecord} implementation for Hive records of {@link
ArrayWritable}.
+ */
+public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> {
+
+ private boolean copy;
+ private final boolean isDeleted;
+
+ public boolean isDeleted() {
+ return isDeleted;
+ }
+
+ private final ArrayWritableObjectInspector objectInspector;
+
+ private final ObjectInspectorCache objectInspectorCache;
+
+ protected Schema schema;
+
+ public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema,
ObjectInspectorCache objectInspectorCache) {
+ super(key, data);
+ this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+ this.objectInspectorCache = objectInspectorCache;
+ this.schema = schema;
+ this.copy = false;
+ isDeleted = data == null;
+ }
+
+ private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema,
HoodieOperation operation, boolean isCopy,
+ ArrayWritableObjectInspector objectInspector,
ObjectInspectorCache objectInspectorCache) {
+ super(key, data, operation, Option.empty());
+ this.schema = schema;
+ this.copy = isCopy;
+ isDeleted = data == null;
+ this.objectInspector = objectInspector;
+ this.objectInspectorCache = objectInspectorCache;
+ }
+
+ @Override
+ public HoodieRecord<ArrayWritable> newInstance() {
+ return new HoodieHiveRecord(this.key, this.data, this.schema,
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+ }
+
+ @Override
+ public HoodieRecord<ArrayWritable> newInstance(HoodieKey key,
HoodieOperation op) {
+ throw new UnsupportedOperationException("ObjectInspector is needed for
HoodieHiveRecord");
+ }
+
+ @Override
+ public HoodieRecord<ArrayWritable> newInstance(HoodieKey key) {
+ throw new UnsupportedOperationException("ObjectInspector is needed for
HoodieHiveRecord");
+ }
+
+ @Override
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ String orderingField = ConfigUtils.getOrderingField(props);
+ if (orderingField == null) {
+ return 0;
+ //throw new IllegalArgumentException("Ordering Field is not set.
Precombine must be set. (If you are using a custom record merger it might be
something else)");
+ }
+ return (Comparable<?>) getValue(ConfigUtils.getOrderingField(props));
+ }
+
+ @Override
+ public HoodieRecordType getRecordType() {
+ return HoodieRecordType.HIVE;
+ }
+
+ @Override
+ public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator>
keyGeneratorOpt) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public String getRecordKey(Schema recordSchema, String keyFieldName) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ protected void writeRecordPayload(ArrayWritable payload, Kryo kryo, Output
output) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ protected ArrayWritable readRecordPayload(Kryo kryo, Input input) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public Object[] getColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
+ Object[] objects = new Object[columns.length];
+ for (int i = 0; i < objects.length; i++) {
+ objects[i] = getValue(columns[i]);
+ }
+ return objects;
+ }
+
+ @Override
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public HoodieRecord prependMetaFields(Schema recordSchema, Schema
targetSchema, MetadataValues metadataValues, Properties props) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public boolean isDelete(Schema recordSchema, Properties props) throws
IOException {
+ if (null == data) {
+ return true;
+ }
+ if (recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+ return false;
+ }
+ Object deleteMarker = getValue(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ return deleteMarker instanceof BooleanWritable && ((BooleanWritable)
deleteMarker).get();
+ }
+
+ @Override
+ public boolean shouldIgnore(Schema recordSchema, Properties props) throws
IOException {
+ return false;
+ }
+
+ @Override
+ public HoodieRecord<ArrayWritable> copy() {
+ if (!copy) {
+ this.data = new ArrayWritable(Writable.class,
Arrays.copyOf(this.data.get(), this.data.get().length));
+ this.copy = true;
+ }
+ return this;
+ }
+
+ @Override
+ public Option<Map<String, String>> getMetadata() {
+ // TODO HUDI-5282 support metaData
+ return Option.empty();
+ }
+
+ @Override
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema
recordSchema, Properties props, Option<Pair<String, String>>
simpleKeyGenFieldsOpt, Boolean withOperation,
+ Option<String>
partitionNameOp, Boolean populateMetaFieldsOp, Option<Schema>
schemaWithoutMetaFields) throws IOException {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema
recordSchema, Properties props, Option<BaseKeyGenerator> keyGen) {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ @Override
+ public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props,
String keyFieldName) throws IOException {
+ data.get()[recordSchema.getIndexNamed(keyFieldName)] = new Text();
+ return this;
+ }
+
+ @Override
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties props) throws IOException {
+ throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
+ }
+
+ private Object getValue(String name) {
+ return HoodieArrayWritableAvroUtils.getWritableValue(data,
objectInspector, name);
+ }
+
+ protected Schema getSchema() {
+ return schema;
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java
new file mode 100644
index 00000000000..17a4738569e
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class HoodieHiveRecordMerger implements HoodieRecordMerger {
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.HIVE);
+ ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.HIVE);
+ if (newer instanceof HoodieHiveRecord) {
+ HoodieHiveRecord newHiveRecord = (HoodieHiveRecord) newer;
+ if (newHiveRecord.isDeleted()) {
+ return Option.empty();
+ }
+ } else if (newer.getData() == null) {
+ return Option.empty();
+ }
+
+ if (older instanceof HoodieHiveRecord) {
+ HoodieHiveRecord oldHiveRecord = (HoodieHiveRecord) older;
+ if (oldHiveRecord.isDeleted()) {
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ } else if (older.getData() == null) {
+ return Option.empty();
+ }
+ if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ return Option.of(Pair.of(older, oldSchema));
+ } else {
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ }
+
+ @Override
+ public HoodieRecord.HoodieRecordType getRecordType() {
+ return HoodieRecord.HoodieRecordType.HIVE;
+ }
+
+ @Override
+ public String getMergingStrategy() {
+ return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 9e656529904..18b9e221978 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -20,10 +20,15 @@ package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -35,9 +40,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.parquet.hadoop.ParquetInputFormat;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +51,11 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.hudi.common.util.TablePathUtils.getTablePath;
+import static org.apache.hudi.common.util.TablePathUtils.isHoodieTablePath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader;
+
/**
* HoodieInputFormat which understands the Hoodie File Structure and filters
files based on the Hoodie Mode. If paths
* that does not correspond to a hoodie table then they are passed in as is
(as what FileInputFormat.listStatus()
@@ -91,9 +99,42 @@ public class HoodieParquetInputFormat extends
HoodieParquetInputFormatBase {
}
}
+ private static boolean checkIfHudiTable(final InputSplit split, final
JobConf job) {
+ try {
+ Path inputPath = ((FileSplit) split).getPath();
+ FileSystem fs = inputPath.getFileSystem(job);
+ HoodieStorage storage = new HoodieHadoopStorage(fs);
+ return getTablePath(storage, convertToStoragePath(inputPath))
+ .map(path -> isHoodieTablePath(storage, path)).orElse(false);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final
InputSplit split, final JobConf job,
final
Reporter reporter) throws IOException {
+ HoodieRealtimeInputFormatUtils.addProjectionField(job,
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
+ if (shouldUseFilegroupReader(job)) {
+ try {
+ if (!(split instanceof FileSplit) || !checkIfHudiTable(split, job)) {
+ return super.getRecordReader(split, job, reporter);
+ }
+ if (supportAvroRead &&
HoodieColumnProjectionUtils.supportTimestamp(job)) {
+ return new HoodieFileGroupReaderBasedRecordReader((s, j, r) -> {
+ try {
+ return new ParquetRecordReaderWrapper(new
HoodieTimestampAwareParquetInputFormat(), s, j, r);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }, split, job, reporter);
+ } else {
+ return new
HoodieFileGroupReaderBasedRecordReader(super::getRecordReader, split, job,
reporter);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("Cannot create a RecordReaderWrapper", e);
+ }
+ }
// TODO enable automatic predicate pushdown after fixing issues
// FileSplit fileSplit = (FileSplit) split;
// HoodieTableMetadata metadata =
getTableMetadata(fileSplit.getPath().getParent());
@@ -117,7 +158,6 @@ public class HoodieParquetInputFormat extends
HoodieParquetInputFormatBase {
LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
}
- HoodieRealtimeInputFormatUtils.addProjectionField(job,
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
return getRecordReaderInternal(split, job, reporter);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
index 7ffa3bf555c..c08c358c0c8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.mapred.RecordReader;
@@ -25,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Iterator;
import java.util.NoSuchElementException;
/**
@@ -34,7 +34,7 @@ import java.util.NoSuchElementException;
* @param <K> Key Type
* @param <V> Value Type
*/
-public class RecordReaderValueIterator<K, V> implements Iterator<V> {
+public class RecordReaderValueIterator<K, V> implements ClosableIterator<V> {
private static final Logger LOG =
LoggerFactory.getLogger(RecordReaderValueIterator.class);
@@ -79,7 +79,12 @@ public class RecordReaderValueIterator<K, V> implements
Iterator<V> {
return retVal;
}
- public void close() throws IOException {
- this.reader.close();
+ @Override
+ public void close() {
+ try {
+ this.reader.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not close reader", e);
+ }
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
index 1edf29d45d5..b89e69f4be8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
@@ -19,8 +19,10 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
@@ -35,6 +37,8 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader;
+
/**
* Allows to read multiple realtime file splits grouped together by
CombineInputFormat.
*/
@@ -42,19 +46,29 @@ public class HoodieCombineRealtimeRecordReader implements
RecordReader<NullWrita
private static final transient Logger LOG =
LoggerFactory.getLogger(HoodieCombineRealtimeRecordReader.class);
// RecordReaders for each split
- List<HoodieRealtimeRecordReader> recordReaders = new LinkedList<>();
+ private List<RecordReader> recordReaders = new LinkedList<>();
// Points to the currently iterating record reader
- HoodieRealtimeRecordReader currentRecordReader;
+ private RecordReader currentRecordReader;
+
+ private final boolean useFileGroupReader;
public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit
split,
List<RecordReader> readers) {
+ useFileGroupReader = shouldUseFilegroupReader(jobConf);
try {
ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit)
split).getRealtimeFileSplits().size() == readers
.size(), "Num Splits does not match number of unique
RecordReaders!");
for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit)
split).getRealtimeFileSplits()) {
- LOG.info("Creating new RealtimeRecordReader for split");
- recordReaders.add(
- new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit,
jobConf, readers.remove(0)));
+ if (useFileGroupReader) {
+ LOG.info("Creating new HoodieFileGroupReaderRecordReader for split");
+ RecordReader reader = readers.remove(0);
+ ValidationUtils.checkArgument(reader instanceof
HoodieFileGroupReaderBasedRecordReader, reader.toString() + "not instance of
HoodieFileGroupReaderRecordReader ");
+ recordReaders.add(reader);
+ } else {
+ LOG.info("Creating new RealtimeRecordReader for split");
+ recordReaders.add(
+ new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit)
rtSplit, jobConf, readers.remove(0)));
+ }
}
currentRecordReader = recordReaders.remove(0);
} catch (Exception e) {
@@ -69,9 +83,20 @@ public class HoodieCombineRealtimeRecordReader implements
RecordReader<NullWrita
} else if (recordReaders.size() > 0) {
this.currentRecordReader.close();
this.currentRecordReader = recordReaders.remove(0);
- AbstractRealtimeRecordReader reader =
(AbstractRealtimeRecordReader)currentRecordReader.getReader();
+ RecordReader reader;
+ JobConf jobConf;
+ Path path;
+ if (useFileGroupReader) {
+ reader = currentRecordReader;
+ jobConf = ((HoodieFileGroupReaderBasedRecordReader)
reader).getJobConf();
+ path = ((HoodieFileGroupReaderBasedRecordReader)
reader).getSplit().getPath();
+ } else {
+ reader = ((HoodieRealtimeRecordReader)currentRecordReader).getReader();
+ jobConf = ((AbstractRealtimeRecordReader) reader).getJobConf();
+ path = ((AbstractRealtimeRecordReader) reader).getSplit().getPath();
+ }
// when switch reader, ioctx should be updated
-
IOContextMap.get(reader.getJobConf()).setInputPath(reader.getSplit().getPath());
+ IOContextMap.get(jobConf).setInputPath(path);
return next(key, value);
} else {
return false;
@@ -80,12 +105,20 @@ public class HoodieCombineRealtimeRecordReader implements
RecordReader<NullWrita
@Override
public NullWritable createKey() {
- return this.currentRecordReader.createKey();
+ if (useFileGroupReader) {
+ return ((HoodieFileGroupReaderBasedRecordReader)
this.currentRecordReader).createKey();
+ } else {
+ return ((HoodieRealtimeRecordReader)
this.currentRecordReader).createKey();
+ }
}
@Override
public ArrayWritable createValue() {
- return this.currentRecordReader.createValue();
+ if (useFileGroupReader) {
+ return ((HoodieFileGroupReaderBasedRecordReader)
this.currentRecordReader).createValue();
+ } else {
+ return ((HoodieRealtimeRecordReader)
this.currentRecordReader).createValue();
+ }
}
@Override
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index 7e74171c3f9..8d56e77dda9 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
@@ -45,6 +44,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.isLogFile;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader;
+
/**
* Input Format, that provides a real-time view of data in a Hoodie table.
*/
@@ -69,16 +72,20 @@ public class HoodieParquetRealtimeInputFormat extends
HoodieParquetInputFormat {
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not
with " + split);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+
+ if (shouldUseFilegroupReader(jobConf)) {
+ return super.getRecordReader(realtimeSplit, jobConf, reporter);
+ }
+
// add preCombineKey
- HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-
.setConf(HadoopFSUtils.getStorageConfWithCopy(jobConf)).setBasePath(realtimeSplit.getBasePath()).build();
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(getStorageConf(jobConf)).setBasePath(realtimeSplit.getBasePath()).build();
HoodieTableConfig tableConfig = metaClient.getTableConfig();
addProjectionToJobConf(realtimeSplit, jobConf, tableConfig);
LOG.info("Creating record reader with readCols :" +
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" +
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// for log only split, set the parquet reader as empty.
- if (HadoopFSUtils.isLogFile(realtimeSplit.getPath())) {
+ if (isLogFile(realtimeSplit.getPath())) {
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new
HoodieEmptyRecordReader(realtimeSplit, jobConf));
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java
new file mode 100644
index 00000000000..a2da796c6f7
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+public class HoodieArrayWritableAvroUtils {
+
+ private static final Cache<String, ObjectInspectorCache>
+ OBJECT_INSPECTOR_TABLE_CACHE =
Caffeine.newBuilder().maximumSize(1000).build();
+
+ public static ObjectInspectorCache getCacheForTable(String table, Schema
tableSchema, JobConf jobConf) {
+ ObjectInspectorCache cache =
OBJECT_INSPECTOR_TABLE_CACHE.getIfPresent(table);
+ if (cache == null) {
+ cache = new ObjectInspectorCache(tableSchema, jobConf);
+ }
+ return cache;
+ }
+
+ private static final Cache<Pair<Schema, Schema>, int[]>
+ PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build();
+
+ public static int[] getProjection(Schema from, Schema to) {
+ return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> {
+ List<Schema.Field> toFields = to.getFields();
+ int[] newProjection = new int[toFields.size()];
+ for (int i = 0; i < newProjection.length; i++) {
+ newProjection[i] = from.getField(toFields.get(i).name()).pos();
+ }
+ return newProjection;
+ });
+ }
+
+ /**
+ * Projection will keep the size from the "from" schema because it gets
recycled
+ * and if the size changes the reader will fail
+ */
+ public static UnaryOperator<ArrayWritable> projectRecord(Schema from, Schema
to) {
+ int[] projection = getProjection(from, to);
+ return arrayWritable -> {
+ Writable[] values = new Writable[arrayWritable.get().length];
+ for (int i = 0; i < projection.length; i++) {
+ values[i] = arrayWritable.get()[projection[i]];
+ }
+ arrayWritable.set(values);
+ return arrayWritable;
+ };
+ }
+
+ public static int[] getReverseProjection(Schema from, Schema to) {
+ return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> {
+ List<Schema.Field> fromFields = from.getFields();
+ int[] newProjection = new int[fromFields.size()];
+ for (int i = 0; i < newProjection.length; i++) {
+ newProjection[i] = to.getField(fromFields.get(i).name()).pos();
+ }
+ return newProjection;
+ });
+ }
+
+ /**
+ * After the reading and merging etc is done, we need to put the records
+ * into the positions of the original schema
+ */
+ public static UnaryOperator<ArrayWritable> reverseProject(Schema from,
Schema to) {
+ int[] projection = getReverseProjection(from, to);
+ return arrayWritable -> {
+ Writable[] values = new Writable[to.getFields().size()];
+ for (int i = 0; i < projection.length; i++) {
+ values[projection[i]] = arrayWritable.get()[i];
+ }
+ arrayWritable.set(values);
+ return arrayWritable;
+ };
+ }
+
+ public static Object getWritableValue(ArrayWritable arrayWritable,
ArrayWritableObjectInspector objectInspector, String name) {
+ return objectInspector.getStructFieldData(arrayWritable,
objectInspector.getStructFieldRef(name));
+ }
+}
+
+
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index fe88855d458..64dc1f63af8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,7 +18,9 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -33,6 +35,7 @@ import
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
@@ -44,6 +47,8 @@ import
org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StoragePath;
@@ -52,8 +57,10 @@ import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
@@ -61,6 +68,8 @@ import
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.slf4j.Logger;
@@ -540,4 +549,31 @@ public class HoodieInputFormatUtils {
throw new HoodieIOException(String.format("Failed to create instance of
%s", HoodieRealtimeFileSplit.class.getName()), e);
}
}
+
+ public static List<String> getPartitionFieldNames(JobConf jobConf) {
+ String partitionFields =
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+ return partitionFields.isEmpty() ? new ArrayList<>() :
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList());
+ }
+
+ public static String getTableBasePath(InputSplit split, JobConf jobConf)
throws IOException {
+ if (split instanceof RealtimeSplit) {
+ RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+ return realtimeSplit.getBasePath();
+ } else {
+ Path inputPath = ((FileSplit) split).getPath();
+ FileSystem fs = inputPath.getFileSystem(jobConf);
+ HoodieStorage storage = new HoodieHadoopStorage(fs);
+ Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage,
convertToStoragePath(inputPath));
+ return tablePath.get().toString();
+ }
+ }
+
+ /**
+ * `schema.on.read` and skip merge not implemented
+ */
+ public static boolean shouldUseFilegroupReader(final JobConf jobConf) {
+ return
jobConf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue())
+ &&
!jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue())
+ &&
!jobConf.getBoolean(HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP, false);
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
new file mode 100644
index 00000000000..ddcc28851df
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * To read value from an ArrayWritable, an ObjectInspector is needed.
+ * Object inspectors are cached here or created using the column type map.
+ */
+public class ObjectInspectorCache {
+ private final Map<String, TypeInfo> columnTypeMap = new HashMap<>();
+ private final Cache<Schema, ArrayWritableObjectInspector>
+ objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build();
+
+ public Map<String, TypeInfo> getColumnTypeMap() {
+ return columnTypeMap;
+ }
+
+ public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) {
+ //From AbstractRealtimeRecordReader#prepareHiveAvroSerializer
+ // hive will append virtual columns at the end of column list. we should
remove those columns.
+ // eg: current table is col1, col2, col3;
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3
,BLOCK__OFFSET__INSIDE__FILE ...
+ Set<String> writerSchemaColNames = tableSchema.getFields().stream().map(f
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+ List<String> columnNameList =
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+ List<TypeInfo> columnTypeList =
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+
+ int columnNameListLen = columnNameList.size() - 1;
+ for (int i = columnNameListLen; i >= 0; i--) {
+ String lastColName = columnNameList.get(columnNameList.size() - 1);
+ // virtual columns will only append at the end of column list. it will
be ok to break the loop.
+ if (writerSchemaColNames.contains(lastColName)) {
+ break;
+ }
+ columnNameList.remove(columnNameList.size() - 1);
+ columnTypeList.remove(columnTypeList.size() - 1);
+ }
+
+ //Use columnNameList.size() instead of columnTypeList because the type
list is longer for some reason
+ IntStream.range(0, columnNameList.size()).boxed().forEach(i ->
columnTypeMap.put(columnNameList.get(i),
+
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));
+
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+ ArrayWritableObjectInspector objectInspector = new
ArrayWritableObjectInspector(rowTypeInfo);
+ objectInspectorCache.put(tableSchema, objectInspector);
+ }
+
+ public ArrayWritableObjectInspector getObjectInspector(Schema schema) {
+ return objectInspectorCache.get(schema, s -> {
+ List<String> columnNameList =
s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<TypeInfo> columnTypeList =
columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+ return new ArrayWritableObjectInspector(rowTypeInfo);
+ });
+
+ }
+
+ public Object getValue(ArrayWritable record, Schema schema, String
fieldName) {
+ try {
+ ArrayWritableObjectInspector objectInspector =
getObjectInspector(schema);
+ return objectInspector.getStructFieldData(record,
objectInspector.getStructFieldRef(fieldName));
+ } catch (Exception e) {
+ throw e;
+ }
+
+ }
+}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 08cd33c2d56..7d7a2eec626 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -368,7 +369,7 @@ public class TestHoodieParquetInputFormat {
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is true and hoodie.database.name
is not null or empty"
- + " and the incremental database name is not set, then the
incremental query will not take effect");
+ + " and the incremental database name is not set, then the
incremental query will not take effect");
}
@Test
@@ -403,7 +404,7 @@ public class TestHoodieParquetInputFormat {
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(),
basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
assertEquals(HoodieTestUtils.HOODIE_DATABASE,
metaClient.getTableConfig().getDatabaseName(),
- String.format("The hoodie.database.name should be %s ",
HoodieTestUtils.HOODIE_DATABASE));
+ String.format("The hoodie.database.name should be %s ",
HoodieTestUtils.HOODIE_DATABASE));
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
@@ -414,7 +415,7 @@ public class TestHoodieParquetInputFormat {
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is false and the incremental
database name is set, "
- + "then the incremental query will not take effect");
+ + "then the incremental query will not take effect");
// The configuration with and without database name exists together
InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true);
@@ -422,13 +423,13 @@ public class TestHoodieParquetInputFormat {
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"When hoodie.incremental.use.database is true, "
- + "We should exclude commit 100 because the returning
incremental pull with start commit time is 100");
+ + "We should exclude commit 100 because the returning incremental
pull with start commit time is 100");
InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false);
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is false, "
- + "We should include commit 100 because the returning
incremental pull with start commit time is 1");
+ + "We should include commit 100 because the returning incremental
pull with start commit time is 1");
}
@Test
@@ -679,13 +680,13 @@ public class TestHoodieParquetInputFormat {
try {
// Verify that Validate mode throws error with invalid commit time
- InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300");
+ InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300");
inputFormat.listStatus(jobConf);
fail("Expected list status to fail when validate is called with unknown
timestamp");
} catch (HoodieIOException e) {
// expected because validate is called with invalid instantTime
}
-
+
//Creating a new jobCOnf Object because old one has
hoodie.%.consume.commit set
jobConf = new JobConf();
inputFormat.setConf(jobConf);
@@ -751,7 +752,7 @@ public class TestHoodieParquetInputFormat {
}
private void ensureRecordsInCommit(String msg, String commit, int
expectedNumberOfRecordsInCommit,
- int totalExpected) throws IOException {
+ int totalExpected) throws IOException {
int actualCount = 0;
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
@@ -777,59 +778,64 @@ public class TestHoodieParquetInputFormat {
@Test
public void testHoodieParquetInputFormatReadTimeType() throws IOException {
- long testTimestampLong = System.currentTimeMillis();
- int testDate = 19116;// 2022-05-04
-
- Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(),
"/test_timetype.avsc");
- String commit = "20160628071126";
- HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(),
basePath.toString(),
- HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET);
- java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016",
"06", "28"));
- String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1",
- HoodieFileFormat.PARQUET.getFileExtension());
- try (AvroParquetWriter parquetWriter = new AvroParquetWriter(
- new Path(partitionPath.resolve(fileId).toString()), schema)) {
- GenericData.Record record = new GenericData.Record(schema);
- record.put("test_timestamp", testTimestampLong * 1000);
- record.put("test_long", testTimestampLong * 1000);
- record.put("test_date", testDate);
- record.put("_hoodie_commit_time", commit);
- record.put("_hoodie_commit_seqno", commit + 1);
- parquetWriter.write(record);
- }
-
- jobConf.set(IOConstants.COLUMNS,
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
- jobConf.set(IOConstants.COLUMNS_TYPES,
"timestamp,bigint,date,string,string");
- jobConf.set(READ_COLUMN_NAMES_CONF_STR,
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
- InputFormatTestUtil.setupPartition(basePath, partitionPath);
- InputFormatTestUtil.commit(basePath, commit);
- FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath());
+ try {
+ long testTimestampLong = System.currentTimeMillis();
+ int testDate = 19116;// 2022-05-04
+
+ Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(),
"/test_timetype.avsc");
+ String commit = "20160628071126";
+ HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(),
basePath.toString(),
+ HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET);
+ java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016",
"06", "28"));
+ String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1",
+ HoodieFileFormat.PARQUET.getFileExtension());
+ try (AvroParquetWriter parquetWriter = new AvroParquetWriter(
+ new Path(partitionPath.resolve(fileId).toString()), schema)) {
+ GenericData.Record record = new GenericData.Record(schema);
+ record.put("test_timestamp", testTimestampLong * 1000);
+ record.put("test_long", testTimestampLong * 1000);
+ record.put("test_date", testDate);
+ record.put("_hoodie_commit_time", commit);
+ record.put("_hoodie_commit_seqno", commit + 1);
+ parquetWriter.write(record);
+ }
- InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
- for (InputSplit split : splits) {
- RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
- .getRecordReader(split, jobConf, null);
- NullWritable key = recordReader.createKey();
- ArrayWritable writable = recordReader.createValue();
- while (recordReader.next(key, writable)) {
- // test timestamp
- if (HiveVersionInfo.getShortVersion().startsWith("3")) {
- LocalDateTime localDateTime = LocalDateTime.ofInstant(
- Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC);
- assertEquals(Timestamp.valueOf(localDateTime).toString(),
String.valueOf(writable.get()[0]));
- } else {
- Date date = new Date();
- date.setTime(testTimestampLong);
- Timestamp actualTime = ((TimestampWritable)
writable.get()[0]).getTimestamp();
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
- assertEquals(dateFormat.format(date), dateFormat.format(actualTime));
+ //this is not a hoodie table!!
+ jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
+ jobConf.set(IOConstants.COLUMNS,
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
+ jobConf.set(IOConstants.COLUMNS_TYPES,
"timestamp,bigint,date,string,string");
+ jobConf.set(READ_COLUMN_NAMES_CONF_STR,
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
+ InputFormatTestUtil.setupPartition(basePath, partitionPath);
+ InputFormatTestUtil.commit(basePath, commit);
+ FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath());
+
+ InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+ for (InputSplit split : splits) {
+ RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
+ .getRecordReader(split, jobConf, null);
+ NullWritable key = recordReader.createKey();
+ ArrayWritable writable = recordReader.createValue();
+ while (recordReader.next(key, writable)) {
+ // test timestamp
+ if (HiveVersionInfo.getShortVersion().startsWith("3")) {
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC);
+ assertEquals(Timestamp.valueOf(localDateTime).toString(),
String.valueOf(writable.get()[0]));
+ } else {
+ Date date = new Date();
+ date.setTime(testTimestampLong);
+ Timestamp actualTime = ((TimestampWritable)
writable.get()[0]).getTimestamp();
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+ assertEquals(dateFormat.format(date),
dateFormat.format(actualTime));
+ }
+ // test long
+ assertEquals(testTimestampLong * 1000, ((LongWritable)
writable.get()[1]).get());
+ // test date
+ assertEquals(LocalDate.ofEpochDay(testDate).toString(),
String.valueOf(writable.get()[2]));
}
- // test long
- assertEquals(testTimestampLong * 1000, ((LongWritable)
writable.get()[1]).get());
- // test date
- assertEquals(LocalDate.ofEpochDay(testDate).toString(),
String.valueOf(writable.get()[2]));
}
- recordReader.close();
+ } finally {
+ jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
}
}
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index 3371b5efb27..ab907390f88 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -243,7 +244,18 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
HoodieCombineHiveInputFormat combineHiveInputFormat = new
HoodieCombineHiveInputFormat();
String tripsHiveColumnTypes =
"double,string,string,string,double,double,double,double,double";
- InputFormatTestUtil.setPropsForInputFormat(jobConf, schema,
tripsHiveColumnTypes);
+ List<Schema.Field> fields = schema.getFields();
+ String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
+ String positions = fields.stream().map(f ->
String.valueOf(f.pos())).collect(Collectors.joining(","));
+
+ String hiveColumnNames =
fields.stream().map(Schema.Field::name).collect(Collectors.joining(","));
+ hiveColumnNames = hiveColumnNames + ",year,month,day";
+ String modifiedHiveColumnTypes =
HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes);
+ modifiedHiveColumnTypes = modifiedHiveColumnTypes +
",string,string,string";
+ jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+ jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES,
modifiedHiveColumnTypes);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
// unset META_TABLE_PARTITION_COLUMNS to trigger HUDI-1718
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index f982a971062..463ad5a2ebc 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -89,6 +90,7 @@ public class TestHoodieMergeOnReadSnapshotReader {
baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
String.valueOf(1024 * 1024));
baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
+ baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
storage = new HoodieHadoopStorage(HadoopFSUtils.getFs(new
StoragePath(basePath.toUri()), baseJobConf));
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 3ee83a09a3b..b992987c690 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -120,6 +121,7 @@ public class TestHoodieRealtimeRecordReader {
storageConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
baseJobConf = new JobConf(storageConf.unwrap());
baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
String.valueOf(1024 * 1024));
+ baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
fs = HadoopFSUtils.getFs(basePath.toUri().toString(), baseJobConf);
storage = new HoodieHadoopStorage(fs);
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java
new file mode 100644
index 00000000000..d7b4a93009b
--- /dev/null
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieArrayWritableAvroUtils {
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ Schema tableSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
+ ObjectInspectorCache objectInspectorCache;
+
+ @BeforeEach
+ public void setup() {
+ List<Schema.Field> fields = tableSchema.getFields();
+ Configuration conf = HoodieTestUtils.getDefaultStorageConf().unwrap();
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(serdeConstants.LIST_COLUMNS,
fields.stream().map(Schema.Field::name).collect(Collectors.joining(",")));
+ jobConf.set(serdeConstants.LIST_COLUMN_TYPES,
HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
+ objectInspectorCache = new
ObjectInspectorCache(HoodieTestDataGenerator.AVRO_SCHEMA, jobConf);
+ }
+
+ @Test
+ public void testProjection() {
+ Schema from = tableSchema;
+ Schema to = HoodieAvroUtils.generateProjectionSchema(from,
Arrays.asList("trip_type", "current_ts", "weight"));
+ UnaryOperator<ArrayWritable> projection =
HoodieArrayWritableAvroUtils.projectRecord(from, to);
+ UnaryOperator<ArrayWritable> reverseProjection =
HoodieArrayWritableAvroUtils.reverseProject(to, from);
+
+ //We reuse the ArrayWritable, so we need to get the values before
projecting
+ ArrayWritable record =
convertArrayWritable(dataGen.generateGenericRecord());
+ Object tripType = objectInspectorCache.getValue(record, from, "trip_type");
+ Object currentTs = objectInspectorCache.getValue(record, from,
"current_ts");
+ Object weight = objectInspectorCache.getValue(record, from, "weight");
+
+ //Make sure the projected fields can be read
+ ArrayWritable projectedRecord = projection.apply(record);
+ assertEquals(tripType, objectInspectorCache.getValue(projectedRecord, to,
"trip_type"));
+ assertEquals(currentTs, objectInspectorCache.getValue(projectedRecord, to,
"current_ts"));
+ assertEquals(weight, objectInspectorCache.getValue(projectedRecord, to,
"weight"));
+
+ //Reverse projection, the fields are in the original spots, but only the
fields we set can be read.
+ //Therefore, we can only check the 3 fields that were in the projection
+ ArrayWritable reverseProjected = reverseProjection.apply(projectedRecord);
+ assertEquals(tripType, objectInspectorCache.getValue(reverseProjected,
from, "trip_type"));
+ assertEquals(currentTs, objectInspectorCache.getValue(reverseProjected,
from, "current_ts"));
+ assertEquals(weight, objectInspectorCache.getValue(reverseProjected, from,
"weight"));
+ }
+
+ private static ArrayWritable convertArrayWritable(GenericRecord record) {
+ return (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, record.getSchema(),
false);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index b6795bc2a2a..99fcdcbf8a3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -253,6 +253,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
long timestamp = Instant.now().toEpochMilli();
Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords,
partitions, bootstrapBasePath);
HoodieWriteConfig config = getConfigBuilder(schema.toString())
+ .withPreCombineField("timestamp")
.withAutoCommit(true)
.withSchema(schema.toString())
.withKeyGenerator(keyGeneratorClass)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index a5a45cabf81..806f7754423 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -19,6 +19,7 @@
package org.apache.hudi.functional;
import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
@@ -106,6 +107,7 @@ public class TestHiveTableSchemaEvolution {
spark.sql(String.format("alter table %s rename column col2 to col2_new",
tableName));
JobConf jobConf = new JobConf();
+ jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
"col1,col2_new");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
index 723d2389d22..2f2d2ba0efa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
@@ -219,7 +219,7 @@ public class TestSparkConsistentBucketClustering extends
HoodieSparkClientTestHa
*/
@ParameterizedTest
@MethodSource("configParamsForSorting")
- public void testClusteringColumnSort(String sortColumn, boolean
rowWriterEnable) throws IOException {
+ public void testClusteringColumnSort(String sortColumn, boolean
rowWriterEnable) throws Exception {
Map<String, String> options = new HashMap<>();
// Record key is handled specially
if (sortColumn.equals("_row_key")) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
index e6c388b3e3b..e00d7290094 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -32,12 +32,14 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing;
@@ -56,8 +58,15 @@ public class TestHoodieStreamerUtils extends
UtilitiesTestBase {
initTestServices();
}
+ private static Stream<Arguments> validRecordTypes() {
+ Stream.Builder<Arguments> b = Stream.builder();
+ b.add(Arguments.of(HoodieRecordType.SPARK));
+ b.add(Arguments.of(HoodieRecordType.AVRO));
+ return b.build();
+ }
+
@ParameterizedTest
- @EnumSource(HoodieRecordType.class)
+ @MethodSource("validRecordTypes")
public void testCreateHoodieRecordsWithError(HoodieRecordType recordType) {
Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
JavaRDD<GenericRecord> recordRdd =
jsc.parallelize(Collections.singletonList(1)).map(i -> {