xushiyan commented on code in PR #8641: URL: https://github.com/apache/hudi/pull/8641#discussion_r1199618043
########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDagExecutionDataSource.scala: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JFunction +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, ScalaAssertionSupport} +import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.function.Consumer +import scala.collection.JavaConversions._ + +/** + * Tests around Dag execution for Spark DataSource. + */ +class TestDagExecutionDataSource extends HoodieSparkClientTestBase with ScalaAssertionSupport { + var spark: SparkSession = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1" + ) + val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName) + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] = + toJavaOption( + Some( + JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) + ) + + @BeforeEach + override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach + override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + FileSystem.closeAll() + System.gc() + } + + /** + * Validates that clustering dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateClusteringForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + for (i <- 1 to 2) { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option("hoodie.metadata.enable", "false") + .mode(if (i == 1) SaveMode.Overwrite else SaveMode.Append) + .save(basePath) + } + + // trigger clustering. + val records = recordsToStrings(dataGen.generateInserts("%05d".format(4), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option("hoodie.cleaner.commits.retained", "0") + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "2") + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Append) + .save(basePath) + + // verify that clustering is not trigered more than once. + assertEquals(sm.triggerCount, 1) + } + + /** + * Validates that bulk insert dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateBulkInsertForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Overwrite) + .save(basePath) + + // verify that bulk insert is not trigered more than once. + assertEquals(sm.triggerCount, 1) + } + + /** + * Validates that insert and upsert dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @ParameterizedTest + @ValueSource(strings = Array("upsert", "insert")) + def testValidateInsertAndUpsertForRepeatedDag(operation: String): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.client.SparkRDDWriteClient.commit") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, operation) + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Overwrite) + .save(basePath) + + // verify that operation is not trigered more than once. + assertEquals(sm.triggerCount, 1) Review Comment: ditto ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestDagExecutionDeltaStreamer.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.sources.ParquetDFSSource; + +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestDagExecutionDeltaStreamer extends HoodieDeltaStreamerTestBase { Review Comment: ```suggestion public class TestHoodieDeltaStreamerDagExecution extends HoodieDeltaStreamerTestBase { ``` ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java: ########## @@ -370,6 +375,28 @@ protected static void prepareORCDFSFiles(int numRecords, String baseORCPath, Str } } + static List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, + String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { + List<String> configs = new ArrayList<>(); + configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); + if (!StringUtils.isNullOrEmpty(autoClean)) { Review Comment: StringUtils.nonEmpty() ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestDagExecutionDeltaStreamer.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.sources.ParquetDFSSource; + +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestDagExecutionDeltaStreamer extends HoodieDeltaStreamerTestBase { + + @Test + public void testInsertForRepeatedDag() throws Exception { + // Configure 3 transformers of same type. 2nd transformer has no suffix + StageEventManager sm = new StageEventManager("org.apache.hudi.client.BaseHoodieClient.finalizeWrite"); + sparkSession.sparkContext().addSparkListener(sm); + runDeltaStreamer(WriteOperationType.INSERT, false, Option.empty()); + assertEquals(sm.triggerCount, 1); + } + + @Test + public void testUpsertForRepeatedDag() throws Exception { + // Configure 3 transformers of same type. 2nd transformer has no suffix + StageEventManager sm = new StageEventManager("org.apache.hudi.client.BaseHoodieClient.finalizeWrite"); + sparkSession.sparkContext().addSparkListener(sm); + runDeltaStreamer(WriteOperationType.UPSERT, false, Option.empty()); + assertEquals(sm.triggerCount, 1); + } + + @Test + public void testBulkInsertForRepeatedDag() throws Exception { + // Configure 3 transformers of same type. 2nd transformer has no suffix + StageEventManager sm = new StageEventManager("org.apache.hudi.client.BaseHoodieClient.finalizeWrite"); + sparkSession.sparkContext().addSparkListener(sm); + runDeltaStreamer(WriteOperationType.BULK_INSERT, false, Option.empty()); + assertEquals(sm.triggerCount, 1); + } Review Comment: these can be parameterizedtest? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDagExecutionDataSource.scala: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JFunction +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, ScalaAssertionSupport} +import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.function.Consumer +import scala.collection.JavaConversions._ + +/** + * Tests around Dag execution for Spark DataSource. + */ +class TestDagExecutionDataSource extends HoodieSparkClientTestBase with ScalaAssertionSupport { + var spark: SparkSession = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1" + ) + val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName) + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] = + toJavaOption( + Some( + JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) + ) + + @BeforeEach + override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach + override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + FileSystem.closeAll() + System.gc() + } + + /** + * Validates that clustering dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateClusteringForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + for (i <- 1 to 2) { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option("hoodie.metadata.enable", "false") + .mode(if (i == 1) SaveMode.Overwrite else SaveMode.Append) + .save(basePath) + } + + // trigger clustering. + val records = recordsToStrings(dataGen.generateInserts("%05d".format(4), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option("hoodie.cleaner.commits.retained", "0") + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "2") + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Append) + .save(basePath) + + // verify that clustering is not trigered more than once. + assertEquals(sm.triggerCount, 1) + } + + /** + * Validates that bulk insert dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateBulkInsertForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Overwrite) + .save(basePath) + + // verify that bulk insert is not trigered more than once. + assertEquals(sm.triggerCount, 1) + } + + /** + * Validates that insert and upsert dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @ParameterizedTest + @ValueSource(strings = Array("upsert", "insert")) + def testValidateInsertAndUpsertForRepeatedDag(operation: String): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.client.SparkRDDWriteClient.commit") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, operation) + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Overwrite) + .save(basePath) + + // verify that operation is not trigered more than once. + assertEquals(sm.triggerCount, 1) + } + + /** + * Validates that compaction dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateCompactionForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + for (i <- 1 to 2) { Review Comment: why do you need 2 deltacommit here? you just need the first commit then a subsequent update that trigger compaction? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDagExecutionDataSource.scala: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JFunction +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, ScalaAssertionSupport} +import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.function.Consumer +import scala.collection.JavaConversions._ + +/** + * Tests around Dag execution for Spark DataSource. + */ +class TestDagExecutionDataSource extends HoodieSparkClientTestBase with ScalaAssertionSupport { + var spark: SparkSession = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1" + ) + val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName) + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] = + toJavaOption( + Some( + JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) + ) + + @BeforeEach + override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach + override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + FileSystem.closeAll() + System.gc() + } + + /** + * Validates that clustering dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateClusteringForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + for (i <- 1 to 2) { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option("hoodie.metadata.enable", "false") + .mode(if (i == 1) SaveMode.Overwrite else SaveMode.Append) + .save(basePath) + } + + // trigger clustering. + val records = recordsToStrings(dataGen.generateInserts("%05d".format(4), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option("hoodie.cleaner.commits.retained", "0") + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "2") + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Append) + .save(basePath) + + // verify that clustering is not trigered more than once. + assertEquals(sm.triggerCount, 1) + } + + /** + * Validates that bulk insert dag is triggered only once. + * We leverage spark event listener to validate it. + */ + @Test + def testValidateBulkInsertForRepeatedDag(): Unit = { + // register stage event listeners + val sm = new StageEventManager("org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow") + spark.sparkContext.addSparkListener(sm) + + var structType: StructType = null + val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option("hoodie.metadata.enable", "false") + .mode(SaveMode.Overwrite) + .save(basePath) + + // verify that bulk insert is not trigered more than once. + assertEquals(sm.triggerCount, 1) Review Comment: expected value goes first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
