This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ee75c91741 Include iceberg module in spotless plugin and apply
formatting fixes
ee75c91741 is described below
commit ee75c91741b045d4ad1b45f37a91962601d08ea8
Author: PHILO-HE <[email protected]>
AuthorDate: Wed Nov 5 09:59:08 2025 +0800
Include iceberg module in spotless plugin and apply formatting fixes
The backends-clickhouse module has its own setting for "includes" for scala
code, which overrides the setting in parent pom. The code under "src-icerberg"
should be included.
---
backends-clickhouse/pom.xml | 2 +
.../ClickHouseIcebergHiveTableSupport.scala | 22 +-
...ClickHouseIcebergMOREqualityDeletionSuite.scala | 14 +-
.../execution/iceberg/ClickHouseIcebergSuite.scala | 657 ++++++++++-----------
4 files changed, 329 insertions(+), 366 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index b1ca75609e..ceef10d22d 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -417,6 +417,8 @@
<include>src-delta/test/scala/**/*.scala</include>
<include>src-delta${delta.binary.version}/main/scala/**/*.scala</include>
<include>src-delta${delta.binary.version}/test/scala/**/*.scala</include>
+ <include>src-iceberg/main/scala/**/*.scala</include>
+ <include>src-iceberg/test/scala/**/*.scala</include>
</includes>
<excludes>
<exclude>src-delta${delta.binary.version}/main/scala/org/apache/spark/sql/delta/commands/*.scala</exclude>
diff --git
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
index f17274cf98..efd26a8572 100644
---
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
+++
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
@@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gluten.execution.iceberg
-import com.google.common.base.Strings
-
import org.apache.gluten.config.GlutenConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+import com.google.common.base.Strings
+
class ClickHouseIcebergHiveTableSupport {
private val sparkConf: SparkConf = new SparkConf()
@@ -61,7 +60,8 @@ class ClickHouseIcebergHiveTableSupport {
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.setCHConfig("use_local_format", true)
- .set("spark.sql.extensions",
+ .set(
+ "spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
@@ -70,7 +70,8 @@ class ClickHouseIcebergHiveTableSupport {
sparkConf.set("spark.hadoop.hive.metastore.uris", url)
}
if (!Strings.isNullOrEmpty(catalog)) {
- sparkConf.set("spark.sql.catalog." + catalog,
"org.apache.iceberg.spark.SparkCatalog")
+ sparkConf
+ .set("spark.sql.catalog." + catalog,
"org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog." + catalog + ".type", "hive")
}
if (!Strings.isNullOrEmpty(path)) {
@@ -81,12 +82,11 @@ class ClickHouseIcebergHiveTableSupport {
def initializeSession(): Unit = {
if (_hiveSpark == null) {
- _hiveSpark =
- SparkSession
- .builder()
- .config(sparkConf)
- .enableHiveSupport()
- .getOrCreate()
+ _hiveSpark = SparkSession
+ .builder()
+ .config(sparkConf)
+ .enableHiveSupport()
+ .getOrCreate()
}
}
diff --git
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
index 742827f7bf..1335ef0cc7 100644
---
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
+++
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
@@ -16,10 +16,12 @@
*/
package org.apache.gluten.execution.iceberg
-import org.apache.commons.io.FileUtils
import
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite,
IcebergScanTransformer}
+
import org.apache.spark.SparkConf
+import org.apache.commons.io.FileUtils
+
import java.io.File
class ClickHouseIcebergMOREqualityDeletionSuite extends
GlutenClickHouseWholeStageTransformerSuite {
@@ -58,15 +60,13 @@ class ClickHouseIcebergMOREqualityDeletionSuite extends
GlutenClickHouseWholeSta
FileUtils.copyDirectory(equalityDeletedData, icebergPathDir)
}
- testWithSpecifiedSparkVersion(
- "iceberg read mor table with equality deletion", "3.3", "3.5") {
+ testWithSpecifiedSparkVersion("iceberg read mor table with equality
deletion", "3.3", "3.5") {
// The table 'test_upsert_query' was generated by Flink + Iceberg from the
iceberg ut,
// the root path must be the '/tmp/junit6640909127060857423/default'
val testTableName = "local.db.test_upsert_query"
- runQueryAndCompare(
- s"""
- |select * from $testTableName;
- |""".stripMargin) {
+ runQueryAndCompare(s"""
+ |select * from $testTableName;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
diff --git
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
index 95de2e0414..5dc02d02ab 100644
---
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
+++
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution.iceberg
import org.apache.gluten.config.GlutenConfig
import
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite,
IcebergScanTransformer}
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
@@ -41,35 +42,31 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg transformer exists") {
withTable("iceberg_tb") {
- spark.sql(
- """
- |create table iceberg_tb using iceberg as
- |(select 1 as col1, 2 as col2, 3 as col3)
- |""".stripMargin)
-
- runQueryAndCompare(
- """
- |select * from iceberg_tb;
- |""".stripMargin) {
+ spark.sql("""
+ |create table iceberg_tb using iceberg as
+ |(select 1 as col1, 2 as col2, 3 as col3)
+ |""".stripMargin)
+
+ runQueryAndCompare("""
+ |select * from iceberg_tb;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
}
- testWithSpecifiedSparkVersion(
- "iceberg bucketed join", "3.3", "3.5") {
+ testWithSpecifiedSparkVersion("iceberg bucketed join", "3.3", "3.5") {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $leftTable(id int, name string, p string)
- |using iceberg
- |partitioned by (bucket(4, id));
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $leftTable(id int, name string, p string)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $leftTable values
@@ -87,12 +84,11 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $rightTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id));
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $rightTable values
@@ -113,46 +109,44 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
"spark.sql.autoBroadcastJoinThreshold" -> "-1",
"spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
) {
- runQueryAndCompare(
- s"""
- |select s.id, s.name, i.name, i.p
- | from $leftTable s inner join $rightTable i
- | on s.id = i.id;
- |""".stripMargin) {
- df => {
- assert(
- getExecutedPlan(df).count(
- plan => {
- plan.isInstanceOf[IcebergScanTransformer]
- }) == 2)
- getExecutedPlan(df).map {
- case plan if plan.isInstanceOf[IcebergScanTransformer] =>
- assert(
-
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
- case _ => // do nothing
+ runQueryAndCompare(s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id;
+ |""".stripMargin) {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+ assert(
+
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+ case _ => // do nothing
+ }
+ checkLengthAndPlan(df, 7)
}
- checkLengthAndPlan(df, 7)
- }
}
}
}
}
- testWithSpecifiedSparkVersion(
- "iceberg bucketed join with partition", "3.3", "3.5") {
+ testWithSpecifiedSparkVersion("iceberg bucketed join with partition", "3.3",
"3.5") {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $leftTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id), p);
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $leftTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $leftTable values
@@ -170,12 +164,11 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $rightTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id), p);
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $rightTable values
@@ -196,27 +189,27 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
"spark.sql.autoBroadcastJoinThreshold" -> "-1",
"spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
) {
- runQueryAndCompare(
- s"""
- |select s.id, s.name, i.name, i.p
- | from $leftTable s inner join $rightTable i
- | on s.id = i.id and s.p = i.p;
- |""".stripMargin) {
- df => {
- assert(
- getExecutedPlan(df).count(
- plan => {
- plan.isInstanceOf[IcebergScanTransformer]
- }) == 2)
- getExecutedPlan(df).map {
- case plan if plan.isInstanceOf[IcebergScanTransformer] =>
- assert(
-
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
- case _ => // do nothing
+ runQueryAndCompare(s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id and s.p = i.p;
+ |""".stripMargin) {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+ assert(
+
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+ case _ => // do nothing
+ }
+ checkLengthAndPlan(df, 7)
}
- checkLengthAndPlan(df, 7)
- }
}
}
}
@@ -228,12 +221,11 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
withTable(leftTable, rightTable) {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $leftTable(id int, name string, p string)
- |using iceberg
- |partitioned by (bucket(4, id));
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $leftTable(id int, name string, p string)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $leftTable values
@@ -253,12 +245,11 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
|(10, 'a4', 'p3');
|""".stripMargin
)
- spark.sql(
- s"""
- |create table $rightTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id));
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $rightTable values
@@ -277,43 +268,43 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
"spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true",
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled" ->
"false"
) {
- runQueryAndCompare(
- s"""
- |select s.id, s.name, i.name, i.p
- | from $leftTable s inner join $rightTable i
- | on s.id = i.id;
- |""".stripMargin) {
- df => {
- assert(
- getExecutedPlan(df).count(
- plan => {
- plan.isInstanceOf[IcebergScanTransformer]
- }) == 2)
- getExecutedPlan(df).map {
- case plan: IcebergScanTransformer =>
- assert(plan.getKeyGroupPartitioning.isDefined)
- assert(plan.getSplitInfos.length == 3)
- case _ => // do nothing
+ runQueryAndCompare(s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id;
+ |""".stripMargin) {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan: IcebergScanTransformer =>
+ assert(plan.getKeyGroupPartitioning.isDefined)
+ assert(plan.getSplitInfos.length == 3)
+ case _ => // do nothing
+ }
}
- }
}
}
}
}
testWithMinSparkVersion(
- "iceberg bucketed join partition value not exists partial cluster", "3.4")
{
+ "iceberg bucketed join partition value not exists partial cluster",
+ "3.4") {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $leftTable(id int, name string, p string)
- |using iceberg
- |partitioned by (bucket(4, id));
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $leftTable(id int, name string, p string)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $leftTable values
@@ -333,12 +324,11 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
|(10, 'a4', 'p3');
|""".stripMargin
)
- spark.sql(
- s"""
- |create table $rightTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id));
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $rightTable values
@@ -357,44 +347,42 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
"spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true",
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled" ->
"true"
) {
- runQueryAndCompare(
- s"""
- |select s.id, s.name, i.name, i.p
- | from $leftTable s inner join $rightTable i
- | on s.id = i.id;
- |""".stripMargin) {
- df => {
- assert(
- getExecutedPlan(df).count(
- plan => {
- plan.isInstanceOf[IcebergScanTransformer]
- }) == 2)
- getExecutedPlan(df).map {
- case plan: IcebergScanTransformer =>
- assert(plan.getKeyGroupPartitioning.isDefined)
- assert(plan.getSplitInfos.length == 3)
- case _ => // do nothing
+ runQueryAndCompare(s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id;
+ |""".stripMargin) {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan: IcebergScanTransformer =>
+ assert(plan.getKeyGroupPartitioning.isDefined)
+ assert(plan.getSplitInfos.length == 3)
+ case _ => // do nothing
+ }
}
- }
}
}
}
}
- testWithSpecifiedSparkVersion(
- "iceberg bucketed join with partition filter", "3.3", "3.5") {
+ testWithSpecifiedSparkVersion("iceberg bucketed join with partition filter",
"3.3", "3.5") {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $leftTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id), p);
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $leftTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $leftTable values
@@ -412,12 +400,11 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
- spark.sql(
- s"""
- |create table $rightTable(id int, name string, p int)
- |using iceberg
- |partitioned by (bucket(4, id), p);
- |""".stripMargin)
+ spark.sql(s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
spark.sql(
s"""
|insert into table $rightTable values
@@ -438,28 +425,28 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
"spark.sql.autoBroadcastJoinThreshold" -> "-1",
"spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
) {
- runQueryAndCompare(
- s"""
- |select s.id, s.name, i.name, i.p
- | from $leftTable s inner join $rightTable i
- | on s.id = i.id
- | where s.p = 1 and i.p = 1;
- |""".stripMargin) {
- df => {
- assert(
- getExecutedPlan(df).count(
- plan => {
- plan.isInstanceOf[IcebergScanTransformer]
- }) == 2)
- getExecutedPlan(df).map {
- case plan if plan.isInstanceOf[IcebergScanTransformer] =>
- assert(
-
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
- case _ => // do nothing
+ runQueryAndCompare(s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id
+ | where s.p = 1 and i.p = 1;
+ |""".stripMargin) {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+ assert(
+
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
+ case _ => // do nothing
+ }
+ checkLengthAndPlan(df, 5)
}
- checkLengthAndPlan(df, 5)
- }
}
}
}
@@ -467,18 +454,15 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg: time travel") {
withTable("iceberg_tm") {
- spark.sql(
- s"""
- |create table iceberg_tm (id int, name string) using iceberg
- |""".stripMargin)
- spark.sql(
- s"""
- |insert into iceberg_tm values (1, "v1"), (2, "v2")
- |""".stripMargin)
- spark.sql(
- s"""
- |insert into iceberg_tm values (3, "v3"), (4, "v4")
- |""".stripMargin)
+ spark.sql(s"""
+ |create table iceberg_tm (id int, name string) using iceberg
+ |""".stripMargin)
+ spark.sql(s"""
+ |insert into iceberg_tm values (1, "v1"), (2, "v2")
+ |""".stripMargin)
+ spark.sql(s"""
+ |insert into iceberg_tm values (3, "v3"), (4, "v4")
+ |""".stripMargin)
val df =
spark.sql("select snapshot_id from default.iceberg_tm.snapshots where
parent_id is null")
@@ -492,15 +476,13 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg: partition filters") {
withTable("iceberg_pf") {
- spark.sql(
- s"""
- |create table iceberg_pf (id int, name string)
- | using iceberg partitioned by (name)
- |""".stripMargin)
- spark.sql(
- s"""
- |insert into iceberg_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4,
"v2")
- |""".stripMargin)
+ spark.sql(s"""
+ |create table iceberg_pf (id int, name string)
+ | using iceberg partitioned by (name)
+ |""".stripMargin)
+ spark.sql(s"""
+ |insert into iceberg_pf values (1, "v1"), (2, "v2"), (3,
"v1"), (4, "v2")
+ |""".stripMargin)
val df1 = runQueryAndCompare("select * from iceberg_pf where name =
'v1'") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
@@ -510,26 +492,24 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg read cow table - delete and update") {
withTable("iceberg_cow_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
- spark.sql(
- """
- |create table iceberg_cow_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg
- |tblproperties (
- | 'format-version' = '2'
- |)
- |partitioned by (p);
- |""".stripMargin)
+ spark.sql("""
+ |create table iceberg_cow_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
// Insert some test rows.
- spark.sql(
- """
- |insert into table iceberg_cow_tb
- |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
- | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
- |""".stripMargin)
+ spark.sql("""
+ |insert into table iceberg_cow_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
// Delete row.
spark.sql(
@@ -550,10 +530,9 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
|""".stripMargin
)
}
- runQueryAndCompare(
- """
- |select * from iceberg_cow_tb;
- |""".stripMargin) {
+ runQueryAndCompare("""
+ |select * from iceberg_cow_tb;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
@@ -562,29 +541,27 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg read mor table - delete and update") {
withTable("iceberg_mor_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
- spark.sql(
- """
- |create table iceberg_mor_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg
- |tblproperties (
- | 'format-version' = '2',
- | 'write.delete.mode' = 'merge-on-read',
- | 'write.update.mode' = 'merge-on-read',
- | 'write.merge.mode' = 'merge-on-read'
- |)
- |partitioned by (p);
- |""".stripMargin)
+ spark.sql("""
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
// Insert some test rows.
- spark.sql(
- """
- |insert into table iceberg_mor_tb
- |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
- | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
- |""".stripMargin)
+ spark.sql("""
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
// Delete row.
spark.sql(
@@ -605,10 +582,9 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
|""".stripMargin
)
}
- runQueryAndCompare(
- """
- |select * from iceberg_mor_tb;
- |""".stripMargin) {
+ runQueryAndCompare("""
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
@@ -617,39 +593,35 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg read cow table - merge into") {
withTable("iceberg_cow_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
- spark.sql(
- """
- |create table iceberg_cow_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg
- |tblproperties (
- | 'format-version' = '2'
- |)
- |partitioned by (p);
- |""".stripMargin)
- spark.sql(
- """
- |create table merge_into_source_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg;
- |""".stripMargin)
+ spark.sql("""
+ |create table iceberg_cow_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+ spark.sql("""
+ |create table merge_into_source_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg;
+ |""".stripMargin)
// Insert some test rows.
- spark.sql(
- """
- |insert into table iceberg_cow_tb
- |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
- |""".stripMargin)
- spark.sql(
- """
- |insert into table merge_into_source_tb
- |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
- | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
- |""".stripMargin)
+ spark.sql("""
+ |insert into table iceberg_cow_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+ |""".stripMargin)
+ spark.sql("""
+ |insert into table merge_into_source_tb
+ |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1',
'p1'),
+ | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+ |""".stripMargin)
// Delete row.
spark.sql(
@@ -677,10 +649,9 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
|""".stripMargin
)
}
- runQueryAndCompare(
- """
- |select * from iceberg_cow_tb;
- |""".stripMargin) {
+ runQueryAndCompare("""
+ |select * from iceberg_cow_tb;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
@@ -689,42 +660,38 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("iceberg read mor table - merge into with merge-on-read mode") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
- spark.sql(
- """
- |create table iceberg_mor_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg
- |tblproperties (
- | 'format-version' = '2',
- | 'write.delete.mode' = 'merge-on-read',
- | 'write.update.mode' = 'merge-on-read',
- | 'write.merge.mode' = 'merge-on-read'
- |)
- |partitioned by (p);
- |""".stripMargin)
- spark.sql(
- """
- |create table merge_into_source_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg;
- |""".stripMargin)
+ spark.sql("""
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+ spark.sql("""
+ |create table merge_into_source_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg;
+ |""".stripMargin)
// Insert some test rows.
- spark.sql(
- """
- |insert into table iceberg_mor_tb
- |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
- |""".stripMargin)
- spark.sql(
- """
- |insert into table merge_into_source_tb
- |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
- | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
- |""".stripMargin)
+ spark.sql("""
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+ |""".stripMargin)
+ spark.sql("""
+ |insert into table merge_into_source_tb
+ |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1',
'p1'),
+ | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+ |""".stripMargin)
// Delete row.
spark.sql(
@@ -752,10 +719,9 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
|""".stripMargin
)
}
- runQueryAndCompare(
- """
- |select * from iceberg_mor_tb;
- |""".stripMargin) {
+ runQueryAndCompare("""
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
@@ -771,23 +737,21 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
"spark.sql.iceberg.handle-timestamp-without-timezone" -> flag,
"spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" ->
flag) {
withTable("part_by_timestamp") {
- spark.sql(
- """
- |create table part_by_timestamp (
- | p timestamp
- |) using iceberg
- |tblproperties (
- | 'format-version' = '1'
- |)
- |partitioned by (p);
- |""".stripMargin)
+ spark.sql("""
+ |create table part_by_timestamp (
+ | p timestamp
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '1'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
// Insert some test rows.
- spark.sql(
- """
- |insert into table part_by_timestamp
- |values (TIMESTAMP '2022-01-01 00:01:20');
- |""".stripMargin)
+ spark.sql("""
+ |insert into table part_by_timestamp
+ |values (TIMESTAMP '2022-01-01 00:01:20');
+ |""".stripMargin)
val df = spark.sql("select * from part_by_timestamp")
checkAnswer(df, Row(java.sql.Timestamp.valueOf("2022-01-01
00:01:20")) :: Nil)
}
@@ -798,24 +762,21 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
test("test read v1 iceberg with partition drop") {
val testTable = "test_table_with_partition"
withTable(testTable) {
- spark.sql(
- s"""
- |CREATE TABLE $testTable (id INT, data STRING, p1 STRING, p2 STRING)
- |USING iceberg
- |tblproperties (
- | 'format-version' = '1'
- |)
- |PARTITIONED BY (p1, p2);
- |""".stripMargin)
- spark.sql(
- s"""
- |INSERT INTO $testTable VALUES
- |(1, 'test_data', 'test_p1', 'test_p2');
- |""".stripMargin)
- spark.sql(
- s"""
- |ALTER TABLE $testTable DROP PARTITION FIELD p2
- |""".stripMargin)
+ spark.sql(s"""
+ |CREATE TABLE $testTable (id INT, data STRING, p1 STRING,
p2 STRING)
+ |USING iceberg
+ |tblproperties (
+ | 'format-version' = '1'
+ |)
+ |PARTITIONED BY (p1, p2);
+ |""".stripMargin)
+ spark.sql(s"""
+ |INSERT INTO $testTable VALUES
+ |(1, 'test_data', 'test_p1', 'test_p2');
+ |""".stripMargin)
+ spark.sql(s"""
+ |ALTER TABLE $testTable DROP PARTITION FIELD p2
+ |""".stripMargin)
val resultDf = spark.sql(s"SELECT id, data, p1, p2 FROM $testTable")
val result = resultDf.collect()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]