alexeykudinkin commented on code in PR #7528:
URL: https://github.com/apache/hudi/pull/7528#discussion_r1054026446
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -57,15 +57,15 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
private static final Logger LOG =
LogManager.getLogger(HoodieSparkEngineContext.class);
private final JavaSparkContext javaSparkContext;
- private SQLContext sqlContext;
+ private final SQLContext sqlContext;
public HoodieSparkEngineContext(JavaSparkContext jsc) {
- super(new SerializableConfiguration(jsc.hadoopConfiguration()), new
SparkTaskContextSupplier());
- this.javaSparkContext = jsc;
- this.sqlContext = SQLContext.getOrCreate(jsc.sc());
+ this(jsc, SQLContext.getOrCreate(jsc.sc()));
}
- public void setSqlContext(SQLContext sqlContext) {
+ public HoodieSparkEngineContext(JavaSparkContext jsc, SQLContext sqlContext)
{
Review Comment:
This change is needed to accommodate for fixing of `HoodieClientTestHarness`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -36,26 +36,29 @@ import org.apache.spark.sql.types.StructType
* [[BaseRelation]] implementation only reading Base files of Hudi tables,
essentially supporting following querying
* modes:
* <ul>
- * <li>For COW tables: Snapshot</li>
- * <li>For MOR tables: Read-optimized</li>
+ * <li>For COW tables: Snapshot</li>
+ * <li>For MOR tables: Read-optimized</li>
* </ul>
*
- * NOTE: The reason this Relation is used in liue of Spark's default
[[HadoopFsRelation]] is primarily due to the
+ * NOTE: The reason this Relation is used in-liue of Spark's default
[[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition
field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might
not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is
used) therefore leading to incorrect
* partition field values being written
*/
-class BaseFileOnlyRelation(sqlContext: SQLContext,
- metaClient: HoodieTableMetaClient,
- optParams: Map[String, String],
- userSchema: Option[StructType],
- globPaths: Seq[Path])
- extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema)
with SparkAdapterSupport {
+case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
Review Comment:
Primary change here is converting the class to be a case class, which in
turn entails that all of the ctor parameters would become field values
requiring corresponding annotation
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala:
##########
@@ -42,12 +42,35 @@ import scala.collection.JavaConverters._
case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
logFiles: List[HoodieLogFile]) extends
HoodieFileSplit
-class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
- optParams: Map[String, String],
- userSchema: Option[StructType],
- globPaths: Seq[Path],
- metaClient: HoodieTableMetaClient)
- extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
+case class MergeOnReadSnapshotRelation(override val sqlContext: SQLContext,
Review Comment:
Same changes as other relations
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -383,50 +372,28 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
*/
protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[FileSplit]
- /**
- * Get all PartitionDirectories based on globPaths if specified, otherwise
use the table path.
- * Will perform pruning if necessary
- */
- private def listPartitionDirectories(globPaths: Seq[Path], partitionFilters:
Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
Review Comment:
Combined this 2 methods into 1
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -427,6 +428,10 @@ class TestMORDataSource extends HoodieClientTestBase with
SparkDatasetMixin {
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
def testPrunedFiltered(recordType: HoodieRecordType) {
+
+ spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
Review Comment:
Will be reverted
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java:
##########
@@ -209,7 +208,13 @@ protected void initSparkContexts(String appName) {
}))
.config(jsc.getConf())
.getOrCreate();
+
sqlContext = new SQLContext(sparkSession);
+ context = new HoodieSparkEngineContext(jsc, sqlContext);
Review Comment:
This change actually fixes the problem that was previously entailing that
`HoodieSparkSessionExtensions` would not be properly injected into created
Spark session, b/c session creation was hijacked in `HoodieSparkEngineContext`
ctor inadvertently pre-empting the creation thereof by invoking
`SQLContext.getOrCreate(jsc.sc())`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -518,14 +485,6 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
}
}
- protected def getColName(f: StructField): String = {
Review Comment:
Dead code
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -36,26 +36,29 @@ import org.apache.spark.sql.types.StructType
* [[BaseRelation]] implementation only reading Base files of Hudi tables,
essentially supporting following querying
* modes:
* <ul>
- * <li>For COW tables: Snapshot</li>
- * <li>For MOR tables: Read-optimized</li>
+ * <li>For COW tables: Snapshot</li>
+ * <li>For MOR tables: Read-optimized</li>
* </ul>
*
- * NOTE: The reason this Relation is used in liue of Spark's default
[[HadoopFsRelation]] is primarily due to the
+ * NOTE: The reason this Relation is used in-liue of Spark's default
[[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition
field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might
not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is
used) therefore leading to incorrect
* partition field values being written
*/
-class BaseFileOnlyRelation(sqlContext: SQLContext,
- metaClient: HoodieTableMetaClient,
- optParams: Map[String, String],
- userSchema: Option[StructType],
- globPaths: Seq[Path])
- extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema)
with SparkAdapterSupport {
+case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
Review Comment:
The reason this method is converted to a case-class is to avoid any in-place
mutations and instead make `updatePrunedDataSchema` produce new instance instead
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -40,13 +40,18 @@ import scala.collection.immutable
/**
* @Experimental
*/
-class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
- optParams: Map[String, String],
- userSchema: Option[StructType],
- metaClient: HoodieTableMetaClient)
- extends MergeOnReadSnapshotRelation(sqlContext, optParams, userSchema,
Seq(), metaClient) with HoodieIncrementalRelationTrait {
-
- override type FileSplit = HoodieMergeOnReadFileSplit
+case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
Review Comment:
Same changes as other relations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]