This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e7c8df7e8b [HUDI-4250][HUDI-4202] Optimize performance of Column Stats
Index reading in Data Skipping (#5746)
e7c8df7e8b is described below
commit e7c8df7e8b886261a9b0da7696a75e73eca04f11
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Mon Jul 25 15:36:12 2022 -0700
[HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading
in Data Skipping (#5746)
We provide an alternative way of fetching Column Stats Index within the
reading process to avoid the penalty of a more heavy-weight execution scheduled
through a Spark engine.
---
.../scala/org/apache/hudi/util/JFunction.scala | 32 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 9 +
.../functional/TestHoodieBackedMetadata.java | 6 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 12 +-
.../hudi/common/config/HoodieMetadataConfig.java | 28 ++
.../metadata/FileSystemBackedTableMetadata.java | 2 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 82 ++--
.../apache/hudi/metadata/HoodieTableMetadata.java | 3 +-
.../hudi/source/stats/ColumnStatsIndices.java | 2 +-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 481 ++++++++++++++-------
.../org/apache/hudi/HoodieCatalystUtils.scala | 65 +++
.../hudi/HoodieDatasetBulkInsertHelper.scala | 7 +-
.../scala/org/apache/hudi/HoodieDatasetUtils.scala | 45 --
.../scala/org/apache/hudi/HoodieFileIndex.scala | 100 ++---
...nsafeRDDUtils.scala => HoodieUnsafeUtils.scala} | 45 +-
.../org/apache/hudi/TestHoodieFileIndex.scala | 10 +-
.../hudi/functional/TestColumnStatsIndex.scala | 168 +++----
.../functional/TestParquetColumnProjection.scala | 4 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 22 +
.../spark/sql/adapter/BaseSpark3Adapter.scala | 22 +
20 files changed, 744 insertions(+), 401 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
index 4a7dca8408..8a612f4da2 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
@@ -17,15 +17,43 @@
package org.apache.hudi.util
+import org.apache.hudi.common.function.{SerializableFunction,
SerializablePairFunction}
+import org.apache.hudi.common.util.collection
+
+import scala.language.implicitConversions
+
/**
* Utility allowing for seamless conversion b/w Java/Scala functional
primitives
*/
object JFunction {
- def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
+ ////////////////////////////////////////////////////////////
+ // From Java to Scala
+ ////////////////////////////////////////////////////////////
+
+ implicit def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
(t: T) => f.apply(t)
- def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
+ ////////////////////////////////////////////////////////////
+ // From Scala to Java
+ ////////////////////////////////////////////////////////////
+
+ implicit def toJavaFunction[T, R](f: Function[T, R]):
java.util.function.Function[T, R] =
+ new java.util.function.Function[T, R] {
+ override def apply(t: T): R = f.apply(t)
+ }
+
+ implicit def toJavaSerializableFunction[T, R](f: Function[T, R]):
SerializableFunction[T, R] =
+ new SerializableFunction[T, R] {
+ override def apply(t: T): R = f.apply(t)
+ }
+
+ implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T,
collection.Pair[K, V]]): SerializablePairFunction[T, K, V] =
+ new SerializablePairFunction[T, K, V] {
+ override def call(t: T): collection.Pair[K, V] = f.apply(t)
+ }
+
+ implicit def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
new java.util.function.Consumer[T] {
override def accept(t: T): Unit = f.apply(t)
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cd30528798..24f4e6117a 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -27,12 +27,16 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression,
InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan,
SubqueryAlias}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition,
LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieCatalystPlansUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
import java.util.Locale
@@ -138,4 +142,9 @@ trait SparkAdapter extends Serializable {
* TODO move to HoodieCatalystExpressionUtils
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate
+
+ /**
+ * Converts instance of [[StorageLevel]] to a corresponding string
+ */
+ def convertStorageLevelToString(level: StorageLevel): String
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index e10c372be6..8828ceab6d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1504,7 +1504,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// prefix search for column (_hoodie_record_key)
ColumnIndexID columnIndexID = new
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result =
tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
-
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should
be 6.
assertEquals(result.size(), 6);
@@ -1515,7 +1515,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// prefix search for col(_hoodie_record_key) and first partition. only 2
files should be matched
PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result =
tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
-
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
@@ -1534,7 +1534,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// prefix search for column {commit time} and first partition
columnIndexID = new
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result =
tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
-
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index ec70653b9c..0b2c34618e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -62,7 +63,7 @@ import java.util.stream.Collectors;
* <li>Query instant/range</li>
* </ul>
*/
-public abstract class BaseHoodieTableFileIndex {
+public abstract class BaseHoodieTableFileIndex {
private static final Logger LOG =
LogManager.getLogger(BaseHoodieTableFileIndex.class);
@@ -166,6 +167,11 @@ public abstract class BaseHoodieTableFileIndex {
.collect(Collectors.toMap(e -> e.getKey().path, Map.Entry::getValue));
}
+ public int getFileSlicesCount() {
+ return cachedAllInputFileSlices.values().stream()
+ .mapToInt(List::size).sum();
+ }
+
protected List<PartitionPath> getAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(new Path(basePath),
path))
@@ -349,10 +355,10 @@ public abstract class BaseHoodieTableFileIndex {
Path fullPartitionPath(String basePath) {
if (!path.isEmpty()) {
- return new Path(basePath, path);
+ return new CachingPath(basePath, path);
}
- return new Path(basePath);
+ return new CachingPath(basePath);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 2cd08b9ae9..b16373ef83 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column
stats index will be built. If not set, all columns will be indexed");
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY =
"in-memory";
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE =
"engine";
+
+ public static final ConfigProperty<String>
COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.processing.mode.override")
+ .noDefaultValue()
+ .withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY,
COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE)
+ .sinceVersion("0.12.0")
+ .withDocumentation("By default Column Stats Index is automatically
determining whether it should be read and processed either"
+ + "'in-memory' (w/in executing process) or using Spark (on a
cluster), based on some factors like the size of the Index "
+ + "and how many columns are read. This config allows to override
this behavior.");
+
+ public static final ConfigProperty<Integer>
COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD = ConfigProperty
+ .key(METADATA_PREFIX +
".index.column.stats.inMemory.projection.threshold")
+ .defaultValue(100000)
+ .sinceVersion("0.12.0")
+ .withDocumentation("When reading Column Stats Index, if the size of the
expected resulting projection is below the in-memory"
+ + " threshold (counted by the # of rows), it will be attempted to be
loaded \"in-memory\" (ie not using the execution engine"
+ + " like Spark, Flink, etc). If the value is above the threshold
execution engine will be used to compose the projection.");
+
public static final ConfigProperty<String> BLOOM_FILTER_INDEX_FOR_COLUMNS =
ConfigProperty
.key(METADATA_PREFIX + ".index.bloom.filter.column.list")
.noDefaultValue()
@@ -246,6 +266,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS),
CONFIG_VALUES_DELIMITER);
}
+ public String getColumnStatsIndexProcessingModeOverride() {
+ return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE);
+ }
+
+ public Integer getColumnStatsIndexInMemoryProjectionThreshold() {
+ return getIntOrDefault(COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD);
+ }
+
public List<String> getColumnsEnabledForBloomFilterIndex() {
return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS),
CONFIG_VALUES_DELIMITER);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index f029995ba0..9877755b3c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -168,7 +168,7 @@ public class FileSystemBackedTableMetadata implements
HoodieTableMetadata {
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean
shouldLoadInMemory) {
throw new HoodieMetadataException("Unsupported operation:
getRecordsByKeyPrefixes!");
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index e96889f044..f8a0389da3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
@@ -143,10 +144,11 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
@Override
public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes,
-
String partitionName) {
+
String partitionName,
+
boolean shouldLoadInMemory) {
// Sort the columns so that keys are looked up in order
- List<String> sortedkeyPrefixes = new ArrayList<>(keyPrefixes);
- Collections.sort(sortedkeyPrefixes);
+ List<String> sortedKeyPrefixes = new ArrayList<>(keyPrefixes);
+ Collections.sort(sortedKeyPrefixes);
// NOTE: Since we partition records to a particular file-group by full
key, we will have
// to scan all file-groups for all key-prefixes as each of these
might contain some
@@ -154,44 +156,44 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
List<FileSlice> partitionFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
- return engineContext.parallelize(partitionFileSlices)
- .flatMap(
- (SerializableFunction<FileSlice, Iterator<Pair<String,
Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
- // NOTE: Since this will be executed by executors, we can't
access previously cached
- // readers, and therefore have to always open new ones
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
readers =
- openReaders(partitionName, fileSlice);
- try {
- List<Long> timings = new ArrayList<>();
-
- HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
-
- if (baseFileReader == null && logRecordScanner == null) {
- // TODO: what do we do if both does not exist? should we
throw an exception and let caller do the fallback ?
- return Collections.emptyIterator();
- }
-
- boolean fullKeys = false;
-
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
logRecords =
- readLogRecords(logRecordScanner, sortedkeyPrefixes,
fullKeys, timings);
-
- List<Pair<String,
Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
- readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedkeyPrefixes, fullKeys, logRecords, timings, partitionName);
-
- LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
- sortedkeyPrefixes.size(), timings));
-
- return mergedRecords.iterator();
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from
metadata table for " + sortedkeyPrefixes.size() + " key : ", ioe);
- } finally {
- closeReader(readers);
- }
+ return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
engineContext.parallelize(partitionFileSlices))
+ .flatMap((SerializableFunction<FileSlice,
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
+ // NOTE: Since this will be executed by executors, we can't access
previously cached
+ // readers, and therefore have to always open new ones
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ openReaders(partitionName, fileSlice);
+
+ try {
+ List<Long> timings = new ArrayList<>();
+
+ HoodieFileReader baseFileReader = readers.getKey();
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+
+ if (baseFileReader == null && logRecordScanner == null) {
+ // TODO: what do we do if both does not exist? should we throw
an exception and let caller do the fallback ?
+ return Collections.emptyIterator();
}
- )
- .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+
+ boolean fullKeys = false;
+
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
logRecords =
+ readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys,
timings);
+
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
mergedRecords =
+ readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+ LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
+ sortedKeyPrefixes.size(), timings));
+
+ return mergedRecords.stream()
+ .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+ .iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata
table for " + sortedKeyPrefixes.size() + " key : ", ioe);
+ } finally {
+ closeReader(readers);
+ }
+ })
.filter(Objects::nonNull);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index a059b57845..ae871e3be0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -170,7 +170,8 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
* @return {@link HoodieData} of {@link HoodieRecord}s with records matching
the passed in key prefixes.
*/
HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes,
-
String partitionName);
+
String partitionName,
+
boolean shouldLoadInMemory);
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index 8ec0eafde3..75e10341db 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -319,7 +319,7 @@ public class ColumnStatsIndices {
.map(colName -> new
ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
- metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames,
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+ metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames,
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false);
org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter
converter =
AvroToRowDataConverters.createRowConverter((RowType)
METADATA_DATA_TYPE.getLogicalType());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index b1e03f86ff..58511f791e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -17,66 +17,153 @@
package org.apache.hudi
-import org.apache.avro.Schema.Parser
-import org.apache.avro.generic.GenericRecord
-import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema,
deserialize, metadataRecordSchemaString, metadataRecordStructType,
tryUnpackNonNullVal}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.ColumnStatsIndexSupport._
+import org.apache.hudi.HoodieCatalystUtils.{withPersistedData,
withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.avro.model._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.rdd.RDD
+import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row,
SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
+import scala.collection.mutable.ListBuffer
+import scala.collection.parallel.mutable.ParHashMap
+
+class ColumnStatsIndexSupport(spark: SparkSession,
+ tableSchema: StructType,
+ @transient metadataConfig: HoodieMetadataConfig,
+ @transient metaClient: HoodieTableMetaClient,
+ allowCaching: Boolean = false) {
+
+ @transient private lazy val engineCtx = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
+ @transient private lazy val metadataTable: HoodieTableMetadata =
+ HoodieTableMetadata.create(engineCtx, metadataConfig,
metaClient.getBasePathV2.toString,
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
+
+ @transient private lazy val cachedColumnStatsIndexViews:
ParHashMap[Seq[String], DataFrame] = ParHashMap()
+
+ // NOTE: Since [[metadataConfig]] is transient this has to be eagerly
persisted, before this will be passed
+ // on to the executor
+ private val inMemoryProjectionThreshold =
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
+
+ private lazy val indexedColumns: Set[String] = {
+ val customIndexedColumns =
metadataConfig.getColumnsEnabledForColumnStatsIndex
+ // Column Stats Index could index either
+ // - The whole table
+ // - Only configured columns
+ if (customIndexedColumns.isEmpty) {
+ tableSchema.fieldNames.toSet
+ } else {
+ customIndexedColumns.asScala.toSet
+ }
+ }
-/**
- * Mixin trait abstracting away heavy-lifting of interactions with Metadata
Table's Column Stats Index,
- * providing convenient interfaces to read it, transpose, etc
- */
-trait ColumnStatsIndexSupport extends SparkAdapterSupport {
-
- def readColumnStatsIndex(spark: SparkSession,
- tableBasePath: String,
- metadataConfig: HoodieMetadataConfig,
- targetColumns: Seq[String] = Seq.empty): DataFrame
= {
- val targetColStatsIndexColumns = Seq(
- HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- val requiredMetadataIndexColumns =
- (targetColStatsIndexColumns :+
HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
- s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
-
- val metadataTableDF: DataFrame = {
- // NOTE: If specific columns have been provided, we can considerably
trim down amount of data fetched
- // by only fetching Column Stats Index records pertaining to the
requested columns.
- // Otherwise we fallback to read whole Column Stats Index
- if (targetColumns.nonEmpty) {
- readColumnStatsIndexForColumnsInternal(spark, targetColumns,
metadataConfig, tableBasePath)
- } else {
- readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
- }
+ /**
+ * Returns true in cases when Column Stats Index is built and available as
standalone partition
+ * w/in the Metadata Table
+ */
+ def isIndexAvailable: Boolean = {
+ checkState(metadataConfig.enabled, "Metadata Table support has to be
enabled")
+
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+ }
+
+ /**
+ * Determines whether it would be more optimal to read Column Stats Index a)
in-memory of the invoking process,
+ * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
+ */
+ def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns:
Seq[String]): Boolean = {
+ Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
+ case Some(mode) =>
+ mode ==
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
+ case None =>
+ fileIndex.getFileSlicesCount * queryReferencedColumns.length <
inMemoryProjectionThreshold
+ }
+ }
+
+ /**
+ * Loads view of the Column Stats Index in a transposed format where single
row coalesces every columns'
+ * statistics for a single file, returning it as [[DataFrame]]
+ *
+ * Please check out scala-doc of the [[transpose]] method explaining this
view in more details
+ */
+ def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory:
Boolean)(block: DataFrame => T): T = {
+ cachedColumnStatsIndexViews.get(targetColumns) match {
+ case Some(cachedDF) =>
+ block(cachedDF)
+
+ case None =>
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+ loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+
+ withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
+ val (transposedRows, indexSchema) = transpose(colStatsRecords,
targetColumns)
+ val df = if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by
[[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution
of the subsequently chained operations
+ // on it locally (on the driver; all such operations are
actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromRows(spark,
transposedRows.collectAsList().asScala, indexSchema)
+ } else {
+ val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
+ spark.createDataFrame(rdd, indexSchema)
+ }
+
+ if (allowCaching) {
+ cachedColumnStatsIndexViews.put(targetColumns, df)
+ // NOTE: Instead of collecting the rows from the index and hold
them in memory, we instead rely
+ // on Spark as (potentially distributed) cache managing data
lifecycle, while we simply keep
+ // the referenced to persisted [[DataFrame]] instance
+ df.persist(StorageLevel.MEMORY_ONLY)
+
+ block(df)
+ } else {
+ withPersistedDataset(df) {
+ block(df)
+ }
+ }
+ }
}
+ }
- val colStatsDF =
metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
- .select(requiredMetadataIndexColumns.map(col): _*)
+ /**
+ * Loads a view of the Column Stats Index in a raw format, returning it as
[[DataFrame]]
+ *
+ * Please check out scala-doc of the [[transpose]] method explaining this
view in more details
+ */
+ def load(targetColumns: Seq[String] = Seq.empty, shouldReadInMemory: Boolean
= false): DataFrame = {
+ // NOTE: If specific columns have been provided, we can considerably trim
down amount of data fetched
+ // by only fetching Column Stats Index records pertaining to the
requested columns.
+ // Otherwise we fallback to read whole Column Stats Index
+ if (targetColumns.nonEmpty) {
+ loadColumnStatsIndexForColumnsInternal(targetColumns, shouldReadInMemory)
+ } else {
+ loadFullColumnStatsIndexInternal()
+ }
+ }
- colStatsDF
+ def invalidateCaches(): Unit = {
+ cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+ cachedColumnStatsIndexViews.clear()
}
/**
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends
SparkAdapterSupport {
* column references from the filtering expressions, and only
transpose records corresponding to the
* columns referenced in those
*
- * @param spark Spark session ref
- * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+ * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing
raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
- * @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
- val colStatsSchema = colStatsDF.schema
- val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
- case (field, ordinal) => (field.name, ordinal)
- }).toMap
-
+ private def transpose(colStatsRecords:
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]):
(HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
- val colNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
- val minValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
- val maxValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
- val fileNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- val nullCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
- val valueCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- // NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
- // w/in the transposed dataset: since some files might not have all
of the columns indexed
- // either due to the Column Stats Index config changes, schema
evolution, etc, we have
- // to make sure that all of the rows w/in transposed data-frame are
properly padded (with null
- // values) for such file-column combinations
- val indexedColumns: Seq[String] = colStatsDF.rdd.map(row =>
row.getString(colNameOrdinal)).distinct().collect()
-
// NOTE: We're sorting the columns to make sure final index schema matches
layout
// of the transposed table
- val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns):
_*)
-
- val transposedRDD = colStatsDF.rdd
- .filter(row =>
sortedTargetColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+ val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+ val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+ // NOTE: This is a trick to avoid pulling all of
[[ColumnStatsIndexSupport]] object into the lambdas'
+ // closures below
+ val indexedColumns = this.indexedColumns
+
+ // Here we perform complex transformation which requires us to modify the
layout of the rows
+ // of the dataset, and therefore we rely on low-level RDD API to avoid
incurring encoding/decoding
+ // penalty of the [[Dataset]], since it's required to adhere to its schema
at all times, while
+ // RDDs are not;
+ val transposedRows: HoodieData[Row] = colStatsRecords
+ // NOTE: Explicit conversion is required for Scala 2.11
+ .filter(JFunction.toJavaSerializableFunction(r =>
sortedTargetColumnsSet.contains(r.getColumnName)))
+ .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+ if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max
have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
- row
+ collection.Pair.of(r.getFileName, r)
} else {
- val minValueStruct = row.getAs[Row](minValueOrdinal)
- val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+ val minValueWrapper = r.getMinValue
+ val maxValueWrapper = r.getMaxValue
- checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
+ checkState(minValueWrapper != null && maxValueWrapper != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
- val colName = row.getString(colNameOrdinal)
+ val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
- val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
- val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
- val rowValsSeq = row.toSeq.toArray
+ val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper),
colType)
+ val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper),
colType)
+
// Update min-/max-value structs w/ unwrapped values in-place
- rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
- rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+ r.setMinValue(minValue)
+ r.setMaxValue(maxValue)
- Row(rowValsSeq: _*)
+ collection.Pair.of(r.getFileName, r)
}
- }
- .groupBy(r => r.getString(fileNameOrdinal))
- .foldByKey(Seq[Row]()) {
- case (_, columnRowsSeq) =>
- // Rows seq is always non-empty (otherwise it won't be grouped into)
- val fileName = columnRowsSeq.head.get(fileNameOrdinal)
- val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
- // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
- // to align existing column-stats for individual file with the list
of expected ones for the
- // whole transposed projection (a superset of all files)
- val columnRowsMap = columnRowsSeq.map(row =>
(row.getString(colNameOrdinal), row)).toMap
- val alignedColumnRowsSeq =
sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
- val coalescedRowValuesSeq =
- alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
- case (acc, opt) =>
- opt match {
- case Some(columnStatsRow) =>
- acc ++ Seq(minValueOrdinal, maxValueOrdinal,
nullCountOrdinal).map(ord => columnStatsRow.get(ord))
- case None =>
- // NOTE: Since we're assuming missing column to
essentially contain exclusively
- // null values, we set null-count to be equal to
value-count (this behavior is
- // consistent with reading non-existent columns from
Parquet)
- acc ++ Seq(null, null, valueCount)
- }
- }
-
- Seq(Row(coalescedRowValuesSeq:_*))
- }
- .values
- .flatMap(it => it)
+ }))
+ .groupByKey()
+ .map(JFunction.toJavaSerializableFunction(p => {
+ val columnRecordsSeq: Seq[HoodieMetadataColumnStats] =
p.getValue.asScala.toSeq
+ val fileName: String = p.getKey
+ val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+ // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
+ // to align existing column-stats for individual file with the list of
expected ones for the
+ // whole transposed projection (a superset of all files)
+ val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName,
r)).toMap
+ val alignedColStatRecordsSeq =
sortedTargetColumns.map(columnRecordsMap.get)
+
+ val coalescedRowValuesSeq =
+ alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName,
valueCount)) {
+ case (acc, opt) =>
+ opt match {
+ case Some(colStatRecord) =>
+ acc ++= Seq(colStatRecord.getMinValue,
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+ case None =>
+ // NOTE: This could occur in either of the following cases:
+ // 1. Column is not indexed in Column Stats Index: in
this case we won't be returning
+ // any statistics for such column (ie all stats will
be null)
+ // 2. Particular file does not have this particular
column (which is indexed by Column Stats Index):
+ // in this case we're assuming missing column to
essentially contain exclusively
+ // null values, we set min/max values as null and
null-count to be equal to value-count (this
+ // behavior is consistent with reading non-existent
columns from Parquet)
+ //
+ // This is a way to determine current column's index without
explicit iteration (we're adding 3 stats / column)
+ val idx = acc.length / 3
+ val colName = sortedTargetColumns(idx)
+ val indexed = indexedColumns.contains(colName)
+
+ val nullCount = if (indexed) valueCount else null
+
+ acc ++= Seq(null, null, nullCount)
+ }
+ }
+
+ Row(coalescedRowValuesSeq:_*)
+ }))
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the
schema
- val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq,
tableSchema)
+ val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+ (transposedRows, indexSchema)
+ }
- spark.createDataFrame(transposedRDD, indexSchema)
+ private def loadColumnStatsIndexForColumnsInternal(targetColumns:
Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+ val colStatsDF = {
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+ // NOTE: Explicit conversion is required for Scala 2.11
+ val catalystRows: HoodieData[InternalRow] =
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+ val converter =
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
columnStatsRecordStructType)
+ it.asScala.map(r => converter(r).orNull).asJava
+ }), false)
+
+ if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by
[[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution of
the subsequently chained operations
+ // on it locally (on the driver; all such operations are
actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromInternalRows(spark,
catalystRows.collectAsList().asScala, columnStatsRecordStructType)
+ } else {
+ createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows),
columnStatsRecordStructType)
+ }
+ }
+
+ colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
}
- private def readFullColumnStatsIndexInternal(spark: SparkSession,
metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
- val metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+ private def loadColumnStatsIndexRecords(targetColumns: Seq[String],
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
+ // Read Metadata Table's Column Stats Index records into [[HoodieData]]
container by
+ // - Fetching the records from CSI by key-prefixes (encoded column
names)
+ // - Extracting [[HoodieMetadataColumnStats]] records
+ // - Filtering out nulls
+ checkState(targetColumns.nonEmpty)
+
+ // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+ val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
+
+ val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+ metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava,
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+
+ val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+ // NOTE: Explicit conversion is required for Scala 2.11
+ metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
+ toScalaOption(record.getData.getInsertValue(null, null))
+ .map(metadataRecord =>
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+ .orNull
+ }))
+ .filter(JFunction.toJavaSerializableFunction(columnStatsRecord =>
columnStatsRecord != null))
+
+ columnStatsRecords
+ }
+
+ private def loadFullColumnStatsIndexInternal(): DataFrame = {
+ val metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
- spark.read.format("org.apache.hudi")
+ val colStatsDF = spark.read.format("org.apache.hudi")
.options(metadataConfig.getProps.asScala)
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
- }
-
- private def readColumnStatsIndexForColumnsInternal(spark: SparkSession,
targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig,
tableBasePath: String) = {
- val ctx = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
- // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
- // - Fetching the records from CSI by key-prefixes (encoded column
names)
- // - Deserializing fetched records into [[InternalRow]]s
- // - Composing [[DataFrame]]
- val metadataTableDF = {
- val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig,
tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- // TODO encoding should be done internally w/in HoodieBackedTableMetadata
- val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
-
- val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
- HoodieJavaRDD.getJavaRDD(
-
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava,
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
- )
-
- val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
- val metadataRecordSchema = new
Parser().parse(metadataRecordSchemaString)
- val converter =
AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema,
metadataRecordStructType)
-
- it.map { record =>
- // schema and props are ignored for generating metadata record from
the payload
- // instead, the underlying file system, or bloom filter, or columns
stats metadata (part of payload) are directly used
- toScalaOption(record.getData.getInsertValue(null, null))
- .flatMap(avroRecord =>
converter(avroRecord.asInstanceOf[GenericRecord]))
- .orNull
- }
- }
+ val requiredIndexColumns =
+ targetColumnStatsIndexColumns.map(colName =>
+
col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
- HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD,
metadataRecordStructType)
- }
- metadataTableDF
+
colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+ .select(requiredIndexColumns: _*)
}
}
object ColumnStatsIndexSupport {
- private val metadataRecordSchemaString: String =
HoodieMetadataRecord.SCHEMA$.toString
- private val metadataRecordStructType: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+ private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper",
"LongWrapper", "FloatWrapper", "DoubleWrapper",
+ "BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper",
"TimeMicrosWrapper", "TimestampMicrosWrapper")
+
+ /**
+ * Target Column Stats Index columns which internally are mapped onto fields
of the correspoding
+ * Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted
w/in Metadata Table
+ */
+ private val targetColumnStatsIndexColumns = Seq(
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME
+ )
+
+ private val columnStatsRecordStructType: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$)
/**
* @VisibleForTesting
@@ -300,13 +417,28 @@ object ColumnStatsIndexSupport {
@inline private def composeColumnStatStructType(col: String, statName:
String, dataType: DataType) =
StructField(formatColName(col, statName), dataType, nullable = true,
Metadata.empty)
- private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) =
- statStruct.toSeq.zipWithIndex
- .find(_._1 != null)
- // NOTE: First non-null value will be a wrapper (converted into Row),
bearing a single
- // value
- .map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)}
- .getOrElse((null, -1))
+ private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
+ valueWrapper match {
+ case w: BooleanWrapper => w.getValue
+ case w: IntWrapper => w.getValue
+ case w: LongWrapper => w.getValue
+ case w: FloatWrapper => w.getValue
+ case w: DoubleWrapper => w.getValue
+ case w: BytesWrapper => w.getValue
+ case w: StringWrapper => w.getValue
+ case w: DateWrapper => w.getValue
+ case w: DecimalWrapper => w.getValue
+ case w: TimeMicrosWrapper => w.getValue
+ case w: TimestampMicrosWrapper => w.getValue
+
+ case r: GenericData.Record if
expectedAvroSchemaValues.contains(r.getSchema.getName) =>
+ r.get("value")
+
+ case _ => throw new UnsupportedOperationException(s"Not recognized value
wrapper type (${valueWrapper.getClass.getSimpleName})")
+ }
+ }
+
+ val decConv = new DecimalConversion()
private def deserialize(value: Any, dataType: DataType): Any = {
dataType match {
@@ -315,12 +447,37 @@ object ColumnStatsIndexSupport {
// here we have to decode those back into corresponding logical
representation.
case TimestampType =>
DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
-
+ // Standard types
+ case StringType => value
+ case BooleanType => value
+ // Numeric types
+ case FloatType => value
+ case DoubleType => value
+ case LongType => value
+ case IntegerType => value
// NOTE: All integral types of size less than Int are encoded as Ints in
MT
case ShortType => value.asInstanceOf[Int].toShort
case ByteType => value.asInstanceOf[Int].toByte
- case _ => value
+ // TODO fix
+ case _: DecimalType =>
+ value match {
+ case buffer: ByteBuffer =>
+ val logicalType =
DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType
+ decConv.fromBytes(buffer, null, logicalType)
+ case _ => value
+ }
+ case BinaryType =>
+ value match {
+ case b: ByteBuffer =>
+ val bytes = new Array[Byte](b.remaining)
+ b.get(bytes)
+ bytes
+ case other => other
+ }
+
+ case _ =>
+ throw new UnsupportedOperationException(s"Data type for the statistic
value is not recognized $dataType")
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
new file mode 100644
index 0000000000..0f41dc1fff
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.data.HoodieData
+import org.apache.spark.sql.Dataset
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
+
+object HoodieCatalystUtils extends SparkAdapterSupport {
+
+ /**
+ * Executes provided function while keeping provided [[Dataset]] instance
persisted for the
+ * duration of the execution
+ *
+ * @param df target [[Dataset]] to be persisted
+ * @param level desired [[StorageLevel]] of the persistence
+ * @param f target function to be executed while [[Dataset]] is kept
persisted
+ * @tparam T return value of the target function
+ * @return execution outcome of the [[f]] function
+ */
+ def withPersistedDataset[T](df: Dataset[_], level: StorageLevel =
MEMORY_AND_DISK)(f: => T): T = {
+ df.persist(level)
+ try {
+ f
+ } finally {
+ df.unpersist()
+ }
+ }
+
+ /**
+ * Executes provided function while keeping provided [[HoodieData]] instance
persisted for the
+ * duration of the execution
+ *
+ * @param data target [[Dataset]] to be persisted
+ * @param level desired [[StorageLevel]] of the persistence
+ * @param f target function to be executed while [[Dataset]] is kept
persisted
+ * @tparam T return value of the target function
+ * @return execution outcome of the [[f]] function
+ */
+ def withPersistedData[T](data: HoodieData[_], level: StorageLevel =
MEMORY_AND_DISK)(f: => T): T = {
+ data.persist(sparkAdapter.convertStorageLevelToString(level))
+ try {
+ f
+ } finally {
+ data.unpersist()
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index dad7c17650..09f1fac2c8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -27,11 +27,10 @@ import org.apache.hudi.keygen.{BuiltinKeyGenerator,
SparkKeyGeneratorInterface}
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath,
getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -92,9 +91,9 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val updatedDF = if (populateMetaFields &&
config.shouldCombineBeforeInsert) {
val dedupedRdd = dedupeRows(prependedRdd, updatedSchema,
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
- HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd,
updatedSchema)
+ HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd,
updatedSchema)
} else {
- HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd,
updatedSchema)
+ HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd,
updatedSchema)
}
val trimmedDF = if (shouldDropPartitionColumns) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
deleted file mode 100644
index a6c689610b..0000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-
-object HoodieDatasetUtils {
-
- /**
- * Executes provided function while keeping provided [[DataFrame]] instance
persisted for the
- * duration of the execution
- *
- * @param df target [[DataFrame]] to be persisted
- * @param level desired [[StorageLevel]] of the persistence
- * @param f target function to be executed while [[DataFrame]] is kept
persisted
- * @tparam T return value of the target function
- * @return execution outcome of the [[f]] function
- */
- def withPersistence[T](df: DataFrame, level: StorageLevel =
MEMORY_AND_DISK)(f: => T): T = {
- df.persist(level)
- try {
- f
- } finally {
- df.unpersist()
- }
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index bd15cb250f..4e158aaa86 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,6 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.HoodieDatasetUtils.withPersistence
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode,
collectReferencedColumns, getConfigProperties}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -26,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -35,7 +34,7 @@ import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndex
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
@@ -80,8 +79,9 @@ case class HoodieFileIndex(spark: SparkSession,
specifiedQueryInstant =
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
- with FileIndex
- with ColumnStatsIndexSupport {
+ with FileIndex {
+
+ @transient private lazy val columnStatsIndex = new
ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
override def rootPaths: Seq[Path] = queryPaths.asScala
@@ -95,8 +95,9 @@ case class HoodieFileIndex(spark: SparkSession,
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
- .filter(_.getBaseFile.isPresent)
- .map(_.getBaseFile.get().getFileStatus)
+ .map(fs => fs.getBaseFile.orElse(null))
+ .filter(_ != null)
+ .map(_.getFileStatus)
.toSeq
}
@@ -196,64 +197,63 @@ case class HoodieFileIndex(spark: SparkSession,
// nothing CSI in particular could be applied for)
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
- if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable ||
!isDataSkippingEnabled) {
+ if (!isMetadataTableEnabled || !isDataSkippingEnabled ||
!columnStatsIndex.isIndexAvailable) {
validateConfig()
Option.empty
} else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
Option.empty
} else {
- val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath,
metadataConfig, queryReferencedColumns)
-
- // Persist DF to avoid re-computing column statistics unraveling
- withPersistence(colStatsDF) {
- val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark,
colStatsDF, queryReferencedColumns, schema)
-
- // Persist DF to avoid re-computing column statistics unraveling
- withPersistence(transposedColStatsDF) {
- val indexSchema = transposedColStatsDF.schema
- val indexFilter =
- queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema))
- .reduce(And)
-
- val allIndexedFileNames =
-
transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
-
- val prunedCandidateFileNames =
- transposedColStatsDF.where(new Column(indexFilter))
- .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
-
- // NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
- // base-file: since it's bound to clustering, which could
occur asynchronously
- // at arbitrary point in time, and is not likely to be
touching all of the base files.
- //
- // To close that gap, we manually compute the difference b/w
all indexed (by col-stats-index)
- // files and all outstanding base-files, and make sure that
all base files not
- // represented w/in the index are included in the output of
this method
- val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
-
- Some(prunedCandidateFileNames ++ notIndexedFileNames)
- }
+ // NOTE: Since executing on-cluster via Spark API has its own
non-trivial amount of overhead,
+ // it's most often preferential to fetch Column Stats Index w/in
the same process (usually driver),
+ // w/o resorting to on-cluster execution.
+ // For that we use a simple-heuristic to determine whether we
should read and process CSI in-memory or
+ // on-cluster: total number of rows of the expected projected
portion of the index has to be below the
+ // threshold (of 100k records)
+ val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this,
queryReferencedColumns)
+
+ columnStatsIndex.loadTransposed(queryReferencedColumns,
shouldReadInMemory) { transposedColStatsDF =>
+ val indexSchema = transposedColStatsDF.schema
+ val indexFilter =
+ queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema))
+ .reduce(And)
+
+ val allIndexedFileNames =
+
transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+
+ val prunedCandidateFileNames =
+ transposedColStatsDF.where(new Column(indexFilter))
+ .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+
+ // NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
+ // base-file: since it's bound to clustering, which could occur
asynchronously
+ // at arbitrary point in time, and is not likely to be touching
all of the base files.
+ //
+ // To close that gap, we manually compute the difference b/w all
indexed (by col-stats-index)
+ // files and all outstanding base-files, and make sure that all
base files not
+ // represented w/in the index are included in the output of this
method
+ val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
+
+ Some(prunedCandidateFileNames ++ notIndexedFileNames)
}
}
}
- override def refresh(): Unit = super.refresh()
+ override def refresh(): Unit = {
+ super.refresh()
+ columnStatsIndex.invalidateCaches()
+ }
override def inputFiles: Array[String] =
allFiles.map(_.getPath.toString).toArray
override def sizeInBytes: Long = cachedFileSize
- private def isColumnStatsIndexAvailable =
- metaClient.getTableConfig.getMetadataPartitions
- .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
-
private def isDataSkippingEnabled: Boolean =
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"false")).toBoolean
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
similarity index 50%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index 8995701d5f..bd7f2f5456 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -21,17 +21,54 @@ package org.apache.spark.sql
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
/**
* Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
*/
-object HoodieUnsafeRDDUtils {
+object HoodieUnsafeUtils {
- // TODO scala-doc
- def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType:
StructType): DataFrame =
- spark.internalCreateDataFrame(rdd, structType)
+ /**
+ * Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with
provided [[schema]]
+ *
+ * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most
computations with it
+ * will be executed by Spark locally
+ *
+ * @param spark spark's session
+ * @param rows collection of rows to base [[DataFrame]] on
+ * @param schema target [[DataFrame]]'s schema
+ * @return
+ */
+ def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema:
StructType): DataFrame =
+ Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes,
rows))
+
+ /**
+ * Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with
provided [[schema]]
+ *
+ * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most
computations with it
+ * will be executed by Spark locally
+ *
+ * @param spark spark's session
+ * @param rows collection of rows to base [[DataFrame]] on
+ * @param schema target [[DataFrame]]'s schema
+ * @return
+ */
+ def createDataFrameFromInternalRows(spark: SparkSession, rows:
Seq[InternalRow], schema: StructType): DataFrame =
+ Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
+
+
+ /**
+ * Creates [[DataFrame]] from the [[RDD]] of [[Row]]s with provided
[[schema]]
+ *
+ * @param spark spark's session
+ * @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
+ * @param schema target [[DataFrame]]'s schema
+ * @return
+ */
+ def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow],
schema: StructType): DataFrame =
+ spark.internalCreateDataFrame(rdd, schema)
/**
* Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]],
returning a properly
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 1d4dbfb1ea..19027a47bf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -369,8 +369,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient)
case class TestCase(enableMetadata: Boolean,
- enableColumnStats: Boolean,
- enableDataSkipping: Boolean)
+ enableColumnStats: Boolean,
+ enableDataSkipping: Boolean,
+ columnStatsProcessingModeOverride: String = null)
val testCases: Seq[TestCase] =
TestCase(enableMetadata = false, enableColumnStats = false,
enableDataSkipping = false) ::
@@ -378,6 +379,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
TestCase(enableMetadata = true, enableColumnStats = false,
enableDataSkipping = true) ::
TestCase(enableMetadata = false, enableColumnStats = true,
enableDataSkipping = true) ::
TestCase(enableMetadata = true, enableColumnStats = true,
enableDataSkipping = true) ::
+ TestCase(enableMetadata = true, enableColumnStats = true,
enableDataSkipping = true, columnStatsProcessingModeOverride =
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY) ::
+ TestCase(enableMetadata = true, enableColumnStats = true,
enableDataSkipping = true, columnStatsProcessingModeOverride =
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) ::
Nil
for (testCase <- testCases) {
@@ -391,7 +394,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
- DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
testCase.enableDataSkipping.toString
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
testCase.enableDataSkipping.toString,
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE.key
-> testCase.columnStatsProcessingModeOverride
) ++ readMetadataOpts
val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props,
NoopCache)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index b982b1851c..822d2051cb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -31,12 +31,12 @@ import
org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
-import org.apache.spark.sql.functions.typedLit
+import org.apache.spark.sql.functions.{col, lit, typedLit}
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, ArgumentsSource,
MethodSource, ValueSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -44,7 +44,7 @@ import scala.collection.JavaConverters._
import scala.util.Random
@Tag("functional")
-class TestColumnStatsIndex extends HoodieClientTestBase with
ColumnStatsIndexSupport {
+class TestColumnStatsIndex extends HoodieClientTestBase {
var spark: SparkSession = _
val sourceTableSchema =
@@ -119,35 +119,31 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
.fromProperties(toProperties(metadataOpts))
.build()
- val requestedColumns: Seq[String] = {
- // Providing empty seq of columns to [[readColumnStatsIndex]] will lead
to the whole
- // MT to be read, and subsequently filtered
- if (testCase.readFullMetadataTable) Seq.empty
- else sourceTableSchema.fieldNames
- }
+ val requestedColumns: Seq[String] = sourceTableSchema.fieldNames
- val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig,
requestedColumns)
- val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF,
sourceTableSchema.fieldNames, sourceTableSchema)
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
val expectedColStatsSchema =
composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
- // Match against expected column stats table
- val expectedColStatsIndexTableDf =
- spark.read
- .schema(expectedColStatsSchema)
-
.json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
+ columnStatsIndex.loadTransposed(requestedColumns,
testCase.shouldReadInMemory) { transposedColStatsDF =>
+ // Match against expected column stats table
+ val expectedColStatsIndexTableDf =
+ spark.read
+ .schema(expectedColStatsSchema)
+
.json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexTableDf.schema,
transposedColStatsDF.schema)
- // NOTE: We have to drop the `fileName` column as it contains semi-random
components
- // that we can't control in this test. Nevertheless, since we
manually verify composition of the
- // ColStats Index by reading Parquet footers from individual Parquet
files, this is not an issue
- assertEquals(asJson(sort(expectedColStatsIndexTableDf)),
asJson(sort(transposedColStatsDF.drop("fileName"))))
+ assertEquals(expectedColStatsIndexTableDf.schema,
transposedColStatsDF.schema)
+ // NOTE: We have to drop the `fileName` column as it contains
semi-random components
+ // that we can't control in this test. Nevertheless, since we
manually verify composition of the
+ // ColStats Index by reading Parquet footers from individual
Parquet files, this is not an issue
+ assertEquals(asJson(sort(expectedColStatsIndexTableDf)),
asJson(sort(transposedColStatsDF.drop("fileName"))))
- // Collect Column Stats manually (reading individual Parquet files)
- val manualColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
expectedColStatsSchema)
+ // Collect Column Stats manually (reading individual Parquet files)
+ val manualColStatsTableDF =
+ buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
sourceTableSchema.fieldNames, expectedColStatsSchema)
- assertEquals(asJson(sort(manualColStatsTableDF)),
asJson(sort(transposedColStatsDF)))
+ assertEquals(asJson(sort(manualColStatsTableDF)),
asJson(sort(transposedColStatsDF)))
+ }
// do an upsert and validate
val updateJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString
@@ -166,26 +162,28 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
- val updatedColStatsDF = readColumnStatsIndex(spark, basePath,
metadataConfig, requestedColumns)
- val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark,
updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+ val updatedColumnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
- val expectedColStatsIndexUpdatedDF =
- spark.read
- .schema(expectedColStatsSchema)
-
.json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
+ updatedColumnStatsIndex.loadTransposed(requestedColumns,
testCase.shouldReadInMemory) { transposedUpdatedColStatsDF =>
+ val expectedColStatsIndexUpdatedDF =
+ spark.read
+ .schema(expectedColStatsSchema)
+
.json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexUpdatedDF.schema,
transposedUpdatedColStatsDF.schema)
- assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)),
asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+ assertEquals(expectedColStatsIndexUpdatedDF.schema,
transposedUpdatedColStatsDF.schema)
+ assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)),
asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
- // Collect Column Stats manually (reading individual Parquet files)
- val manualUpdatedColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
expectedColStatsSchema)
+ // Collect Column Stats manually (reading individual Parquet files)
+ val manualUpdatedColStatsTableDF =
+ buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
sourceTableSchema.fieldNames, expectedColStatsSchema)
- assertEquals(asJson(sort(manualUpdatedColStatsTableDF)),
asJson(sort(transposedUpdatedColStatsDF)))
+ assertEquals(asJson(sort(manualUpdatedColStatsTableDF)),
asJson(sort(transposedUpdatedColStatsDF)))
+ }
}
- @Test
- def testMetadataColumnStatsIndexPartialProjection(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory:
Boolean): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
@@ -235,11 +233,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
// These are NOT indexed
val requestedColumns = Seq("c4")
- val emptyColStatsDF = readColumnStatsIndex(spark, basePath,
metadataConfig, requestedColumns)
- val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark,
emptyColStatsDF, requestedColumns, sourceTableSchema)
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
- assertEquals(0, emptyColStatsDF.collect().length)
- assertEquals(0, emptyTransposedColStatsDF.collect().length)
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) {
emptyTransposedColStatsDF =>
+ assertEquals(0, emptyTransposedColStatsDF.collect().length)
+ }
}
////////////////////////////////////////////////////////////////////////
@@ -252,29 +250,27 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
// We have to include "c1", since we sort the expected outputs by this
column
val requestedColumns = Seq("c4", "c1")
- val partialColStatsDF = readColumnStatsIndex(spark, basePath,
metadataConfig, requestedColumns)
- val partialTransposedColStatsDF = transposeColumnStatsIndex(spark,
partialColStatsDF, requestedColumns, sourceTableSchema)
-
- val targetIndexedColumns =
targetColumnsToIndex.intersect(requestedColumns)
- val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns,
sourceTableSchema)
-
+ val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted,
sourceTableSchema)
// Match against expected column stats table
val expectedColStatsIndexTableDf =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexTableDf.schema,
partialTransposedColStatsDF.schema)
- // NOTE: We have to drop the `fileName` column as it contains
semi-random components
- // that we can't control in this test. Nevertheless, since we
manually verify composition of the
- // ColStats Index by reading Parquet footers from individual
Parquet files, this is not an issue
- assertEquals(asJson(sort(expectedColStatsIndexTableDf)),
asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
-
// Collect Column Stats manually (reading individual Parquet files)
val manualColStatsTableDF =
- buildColumnStatsTableManually(basePath, targetIndexedColumns,
expectedColStatsSchema)
+ buildColumnStatsTableManually(basePath, requestedColumns,
targetColumnsToIndex, expectedColStatsSchema)
+
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
- assertEquals(asJson(sort(manualColStatsTableDF)),
asJson(sort(partialTransposedColStatsDF)))
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) {
partialTransposedColStatsDF =>
+ assertEquals(expectedColStatsIndexTableDf.schema,
partialTransposedColStatsDF.schema)
+ // NOTE: We have to drop the `fileName` column as it contains
semi-random components
+ // that we can't control in this test. Nevertheless, since we
manually verify composition of the
+ // ColStats Index by reading Parquet footers from individual
Parquet files, this is not an issue
+ assertEquals(asJson(sort(expectedColStatsIndexTableDf)),
asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
+ assertEquals(asJson(sort(manualColStatsTableDF)),
asJson(sort(partialTransposedColStatsDF)))
+ }
}
////////////////////////////////////////////////////////////////////////
@@ -307,27 +303,26 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
val requestedColumns = sourceTableSchema.fieldNames
- // Nevertheless, the last update was written with a new schema (that is
a subset of the original table schema),
- // we should be able to read CSI, which will be properly padded (with
nulls) after transposition
- val updatedColStatsDF = readColumnStatsIndex(spark, basePath,
metadataConfig, requestedColumns)
- val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark,
updatedColStatsDF, requestedColumns, sourceTableSchema)
-
- val targetIndexedColumns =
targetColumnsToIndex.intersect(requestedColumns)
- val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns,
sourceTableSchema)
-
+ val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted,
sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexUpdatedDF.schema,
transposedUpdatedColStatsDF.schema)
- assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)),
asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
-
// Collect Column Stats manually (reading individual Parquet files)
val manualUpdatedColStatsTableDF =
- buildColumnStatsTableManually(basePath, targetIndexedColumns,
expectedColStatsSchema)
+ buildColumnStatsTableManually(basePath, requestedColumns,
targetColumnsToIndex, expectedColStatsSchema)
- assertEquals(asJson(sort(manualUpdatedColStatsTableDF)),
asJson(sort(transposedUpdatedColStatsDF)))
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
+
+ // Nevertheless, the last update was written with a new schema (that is
a subset of the original table schema),
+ // we should be able to read CSI, which will be properly padded (with
nulls) after transposition
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) {
transposedUpdatedColStatsDF =>
+ assertEquals(expectedColStatsIndexUpdatedDF.schema,
transposedUpdatedColStatsDF.schema)
+
+ assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)),
asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+ assertEquals(asJson(sort(manualUpdatedColStatsTableDF)),
asJson(sort(transposedUpdatedColStatsDF)))
+ }
}
}
@@ -370,7 +365,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
})
}
- private def buildColumnStatsTableManually(tablePath: String, indexedCols:
Seq[String], indexSchema: StructType) = {
+ private def buildColumnStatsTableManually(tablePath: String,
+ includedCols: Seq[String],
+ indexedCols: Seq[String],
+ indexSchema: StructType):
DataFrame = {
val files = {
val it = fs.listFiles(new Path(tablePath), true)
var seq = Seq[LocatedFileStatus]()
@@ -387,15 +385,23 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
s"'${typedLit(file.getPath.getName)}' AS file" +:
s"sum(1) AS valueCount" +:
df.columns
- .filter(col => indexedCols.contains(col))
+ .filter(col => includedCols.contains(col))
.flatMap(col => {
val minColName = s"${col}_minValue"
val maxColName = s"${col}_maxValue"
- Seq(
- s"min($col) AS $minColName",
- s"max($col) AS $maxColName",
- s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
- )
+ if (indexedCols.contains(col)) {
+ Seq(
+ s"min($col) AS $minColName",
+ s"max($col) AS $maxColName",
+ s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
+ )
+ } else {
+ Seq(
+ s"null AS $minColName",
+ s"null AS $maxColName",
+ s"null AS ${col}_nullCount"
+ )
+ }
})
df.selectExpr(exprs: _*)
@@ -461,11 +467,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
object TestColumnStatsIndex {
- case class ColumnStatsTestCase(forceFullLogScan: Boolean,
readFullMetadataTable: Boolean)
+ case class ColumnStatsTestCase(forceFullLogScan: Boolean,
shouldReadInMemory: Boolean)
def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
java.util.stream.Stream.of(
- Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false,
readFullMetadataTable = false)),
- Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true,
readFullMetadataTable = true))
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false,
shouldReadInMemory = true)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false,
shouldReadInMemory = false)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true,
shouldReadInMemory = false)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true,
shouldReadInMemory = true))
)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index b24a341d4a..00ab709144 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.{DataSourceReadOptions,
DataSourceWriteOptions, DefaultSo
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
+import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Disabled, Tag, Test}
@@ -316,7 +316,7 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
val (rows, bytesRead) = measureBytesRead { () =>
val rdd = relation.buildScan(targetColumns,
Array.empty).asInstanceOf[HoodieUnsafeRDD]
- HoodieUnsafeRDDUtils.collect(rdd)
+ HoodieUnsafeUtils.collect(rdd)
}
val targetRecordCount = tableState.targetRecordCount;
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 30af252d2d..3c0282d710 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -22,9 +22,13 @@ import org.apache.avro.Schema
import org.apache.hudi.Spark2RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression,
InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
@@ -32,6 +36,8 @@ import
org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieCatalystPlansUtils, HoodieSpark2CatalystExpressionUtils,
HoodieSpark2CatalystPlanUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
import scala.collection.mutable.ArrayBuffer
@@ -115,4 +121,20 @@ class Spark2Adapter extends SparkAdapter {
override def createInterpretedPredicate(e: Expression): InterpretedPredicate
= {
InterpretedPredicate.create(e)
}
+
+ override def convertStorageLevelToString(level: StorageLevel): String =
level match {
+ case NONE => "NONE"
+ case DISK_ONLY => "DISK_ONLY"
+ case DISK_ONLY_2 => "DISK_ONLY_2"
+ case MEMORY_ONLY => "MEMORY_ONLY"
+ case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
+ case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
+ case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
+ case MEMORY_AND_DISK => "MEMORY_AND_DISK"
+ case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
+ case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
+ case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
+ case OFF_HEAP => "OFF_HEAP"
+ case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 034d21dba4..4f55039746 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -33,6 +33,8 @@ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{HoodieCatalystPlansUtils,
HoodieSpark3CatalystPlanUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel.{DISK_ONLY, DISK_ONLY_2,
DISK_ONLY_3, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER,
MEMORY_AND_DISK_SER_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER,
MEMORY_ONLY_SER_2, NONE, OFF_HEAP}
import scala.util.control.NonFatal
@@ -100,4 +102,24 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
override def createInterpretedPredicate(e: Expression): InterpretedPredicate
= {
Predicate.createInterpreted(e)
}
+
+ /**
+ * Converts instance of [[StorageLevel]] to a corresponding string
+ */
+ override def convertStorageLevelToString(level: StorageLevel): String =
level match {
+ case NONE => "NONE"
+ case DISK_ONLY => "DISK_ONLY"
+ case DISK_ONLY_2 => "DISK_ONLY_2"
+ case DISK_ONLY_3 => "DISK_ONLY_3"
+ case MEMORY_ONLY => "MEMORY_ONLY"
+ case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
+ case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
+ case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
+ case MEMORY_AND_DISK => "MEMORY_AND_DISK"
+ case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
+ case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
+ case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
+ case OFF_HEAP => "OFF_HEAP"
+ case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
+ }
}