yihua commented on code in PR #9717:
URL: https://github.com/apache/hudi/pull/9717#discussion_r1387220775
##########
hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java:
##########
@@ -318,7 +318,7 @@ public void testRemoveFields() {
schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
- + "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
+ + "{\"name\": \"pii_col\", \"type\": \"string\"}]}";
Review Comment:
Interesting. Does it fail the test before with the comma at the end of the
schema String?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -77,7 +77,16 @@ object HoodieAnalysis extends SparkAdapterSupport {
}
} else {
rules += adaptIngestionTargetLogicalRelations
- val dataSourceV2ToV1FallbackClass =
"org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
+ val dataSourceV2ToV1FallbackClass = if (HoodieSparkUtils.isSpark3_5)
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback"
+ else if (HoodieSparkUtils.isSpark3_4)
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark34DataSourceV2ToV1Fallback"
+ else if (HoodieSparkUtils.isSpark3_3)
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark33DataSourceV2ToV1Fallback"
+ else {
+ // Spark 3.2.x
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark32DataSourceV2ToV1Fallback"
+ }
Review Comment:
Could Spark 3.4 and below share the same fallback class?
##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java:
##########
@@ -49,15 +47,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.stream.Collectors;
-
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
/**
* Dataset test utils.
+ * Note: This util class can be only used within `hudi-spark<spark_version>`
modules because it
+ * relies on SparkAdapterSupport to get encoder for different versions of
Spark. If used elsewhere this
+ * class won't be initialized properly amd could cause ClassNotFoundException
or NoClassDefFoundError
Review Comment:
```suggestion
* class won't be initialized properly and could cause
ClassNotFoundException or NoClassDefFoundError
```
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/DataFrameUtil.scala:
##########
@@ -31,7 +33,7 @@ object DataFrameUtil {
*/
def createFromInternalRows(sparkSession: SparkSession, schema:
StructType, rdd: RDD[InternalRow]): DataFrame = {
- val logicalPlan = LogicalRDD(schema.toAttributes, rdd)(sparkSession)
+ val logicalPlan =
LogicalRDD(SparkAdapterSupport.sparkAdapter.toAttributes(schema),
rdd)(sparkSession)
Review Comment:
Let's add this util method to `HoodieSchemaUtils`.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -214,4 +215,26 @@ trait SparkAdapter extends Serializable {
* Tries to translate a Catalyst Expression into data source Filter
*/
def translateFilter(predicate: Expression, supportNestedPredicatePushdown:
Boolean = false): Option[Filter]
+
+ /**
+ * SPARK-44353 StructType#toAttributes was removed in Spark 3.5.0
+ * Use DataTypeUtils#toAttributes for Spark 3.5+
+ */
+ def toAttributes(struct: StructType): Seq[Attribute]
+
+ /**
+ * SPARK-43039 FileIndex#PartitionDirectory refactored in Spark 3.5.0
+ */
+ def toFileStatuses(partitionDirs: Seq[PartitionDirectory]): Seq[FileStatus]
+
+ /**
+ * SPARK-43039 FileIndex#PartitionDirectory refactored in Spark 3.5.0
+ */
+ def newPartitionDirectory(internalRow: InternalRow, statuses:
Seq[FileStatus]): PartitionDirectory
+
+ /**
+ * SPARK-44531 Encoder inference moved elsewhere in Spark 3.5.0
+ * Mainly used for unit tests
+ */
+ def getEncoder(schema: StructType): ExpressionEncoder[Row]
Review Comment:
nit: let's move these util methods to corresponding Util classes as much as
possible.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -108,6 +119,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
} else {
throw new IllegalStateException("Unsupported Spark version")
}
+ }
Review Comment:
nit: no need of outer brackets.
##########
hudi-spark-datasource/hudi-spark3.4.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for TestHoodieBulkInsertDataInternalWriter.
+ */
+public class HoodieBulkInsertInternalWriterTestBase extends
HoodieSparkClientTestHarness {
Review Comment:
Looks like this class is added for every Spark versions. Is this due to
rebase misses?
##########
.github/workflows/bot.yml:
##########
@@ -254,9 +261,12 @@ jobs:
strategy:
matrix:
include:
+# - flinkProfile: 'flink1.17'
+# sparkProfile: 'spark3.4'
+# sparkRuntime: 'spark3.4.0'
Review Comment:
Let's uncomment these test combinations once the changes are ready.
##########
hudi-spark-datasource/hudi-spark3.0.x/pom.xml:
##########
@@ -157,6 +157,14 @@
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark30.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
Review Comment:
similar here and other poms.
##########
hudi-spark-datasource/hudi-spark2/pom.xml:
##########
@@ -197,6 +197,14 @@
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark2.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
Review Comment:
Any reason of adding this?
##########
hudi-spark-datasource/hudi-spark3.0.x/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for TestHoodieBulkInsertDataInternalWriter.
+ */
+public class HoodieBulkInsertInternalWriterTestBase extends
HoodieSparkClientTestHarness {
Review Comment:
Is this a new class you added?
##########
packaging/hudi-utilities-slim-bundle/pom.xml:
##########
@@ -109,6 +109,8 @@
<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
+ <!-- SPARK-43489 Spark 3.5+ has marked protobuf as provided
-->
+ <include>com.google.protobuf:protobuf-java</include>
Review Comment:
This should not be included, as the `hudi-utilities-slim-bundle` should be
used with `hudi-spark-bundle`, which contains
`com.google.protobuf:protobuf-java`?
##########
hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java:
##########
@@ -107,6 +108,7 @@ public void testClusteringPlanMultipleInstants() throws
Exception {
// replacecommit.inflight doesn't have clustering plan.
// Verify that getClusteringPlan fetches content from corresponding
requested file.
+ @Disabled("Will fail due to avro issue AVRO-3789. This is fixed in avro
1.11.3")
Review Comment:
Does this fail on master, or only for Spark 3.5?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]