This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 556be9428fc [HUDI-7188] Revert "[HUDI-6836] Shutting down
deltastreamer in tests and shutting down metrics for write client (#9667)"
(#10264)
556be9428fc is described below
commit 556be9428fc19824c2533b76cb6240509799e684
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Dec 6 18:43:05 2023 -0800
[HUDI-7188] Revert "[HUDI-6836] Shutting down deltastreamer in tests and
shutting down metrics for write client (#9667)" (#10264)
This reverts commit 594b0ea4abe0bebfaa1ad742877e707d2a39f2d4.
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 4 -
.../apache/hudi/functional/TestCOWDataSource.scala | 1 -
.../deltastreamer/TestHoodieDeltaStreamer.java | 1204 ++++++++------------
3 files changed, 479 insertions(+), 730 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d94249e08e8..a3aa6699027 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1358,10 +1358,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
// before this point
this.index.close();
this.tableServiceClient.close();
- // shutdown metrics
- if (this.metrics != null && this.metrics.getMetrics() != null) {
- this.metrics.getMetrics().shutdown();
- }
}
public void setWriteTimer(String commitType) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index c71226d0130..f500ea83120 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1604,7 +1604,6 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be
shutdown")
- assertEquals(false, Metrics.isInitialized(basePath + "/.hoodie/metadata"),
"Metrics should be shutdown for metadata table")
}
@ParameterizedTest
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 5ae242234a0..03208a0c0e5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -419,17 +419,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()),
propsFilename, false), jsc);
- try {
- deltaStreamer.sync();
- HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
- .setConf(new Configuration()).setBasePath(tableBasePath).build();
- assertEquals(
- expectedKeyGeneratorClassName,
metaClient.getTableConfig().getKeyGeneratorClassName());
- Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
- assertEquals(1000, res.count());
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration()).setBasePath(tableBasePath).build();
+ assertEquals(
+ expectedKeyGeneratorClassName,
metaClient.getTableConfig().getKeyGeneratorClassName());
+ Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
+ assertEquals(1000, res.count());
}
@Test
@@ -453,16 +449,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(HIVE_STYLE_PARTITIONING_ENABLE.key() + "=" + configFlag);
cfg.configs.add(URL_ENCODE_PARTITIONING.key() + "=" + configFlag);
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
- try {
- deltaStreamer.getIngestionService().ingestOnce();
- // Create new metaClient from tablePath
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get())
- .setBasePath(tablePath).build();
- assertEquals(configFlag,
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable()));
- assertEquals(configFlag,
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.getIngestionService().ingestOnce();
+ // Create new metaClient from tablePath
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get())
+ .setBasePath(tablePath).build();
+ assertEquals(configFlag,
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable()));
+ assertEquals(configFlag,
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
}
@ParameterizedTest
@@ -503,31 +495,27 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.targetBasePath = newDatasetBasePath;
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- Dataset<Row> res =
sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
- LOG.info("Schema :");
- res.printSchema();
- assertRecordCount(1950, newDatasetBasePath, sqlContext);
- res.registerTempTable("bootstrapped");
- assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key
from bootstrapped").count());
- // NOTE: To fetch record's count Spark will optimize the query fetching
minimal possible amount
- // of data, which might not provide adequate amount of test
coverage
- sqlContext.sql("select * from bootstrapped").show();
-
- StructField[] fields = res.schema().fields();
- List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
- List<String> expectedFieldNames =
Arrays.asList(sourceDf.schema().fieldNames());
- assertEquals(expectedFieldNames.size(), fields.length);
- assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
- assertTrue(fieldNames.containsAll(expectedFieldNames));
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, bootstrapSourcePath);
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, newDatasetBasePath);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ Dataset<Row> res =
sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
+ LOG.info("Schema :");
+ res.printSchema();
+
+ assertRecordCount(1950, newDatasetBasePath, sqlContext);
+ res.registerTempTable("bootstrapped");
+ assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from
bootstrapped").count());
+ // NOTE: To fetch record's count Spark will optimize the query fetching
minimal possible amount
+ // of data, which might not provide adequate amount of test coverage
+ sqlContext.sql("select * from bootstrapped").show();
+
+ StructField[] fields = res.schema().fields();
+ List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
+ List<String> expectedFieldNames =
Arrays.asList(sourceDf.schema().fieldNames());
+ assertEquals(expectedFieldNames.size(), fields.length);
+ assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
+ assertTrue(fieldNames.containsAll(expectedFieldNames));
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, bootstrapSourcePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, newDatasetBasePath);
}
@Test
@@ -560,15 +548,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg,
Integer expected, String tableBasePath, String metadata, Integer totalCommits)
throws Exception {
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- assertRecordCount(expected, tableBasePath, sqlContext);
- assertDistanceCount(expected, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs,
totalCommits);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertRecordCount(expected, tableBasePath, sqlContext);
+ assertDistanceCount(expected, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs,
totalCommits);
}
// TODO add tests w/ disabled reconciliation
@@ -587,14 +570,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
+ "=false");
}
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+
// Upsert data produced with Schema B, pass Schema B
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
@@ -605,12 +584,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
+ "=false");
}
- ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
@@ -637,17 +611,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
+ "=false");
}
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
- ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- // again, 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
- assertRecordCount(1900, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
- counts = countsPerCommit(tableBasePath, sqlContext);
- assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // again, 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
+ assertRecordCount(1900, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+ counts = countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
@@ -686,15 +655,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(),
MetricsReporterType.INMEMORY.name()));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
- assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
- assertFalse(Metrics.isInitialized(tableBasePath + "/.hoodie/metadata"),
"Metrics should be shutdown for metadata table");
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- } finally {
- ds.shutdownGracefully();
- }
+ ds.sync();
+ assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
+ assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@Timeout(600)
@@ -722,15 +686,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(),
MetricsReporterType.INMEMORY.name()));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
- assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
- assertFalse(Metrics.isInitialized(tableBasePath + "/.hoodie/metadata"),
"Metrics should be shutdown for metadata table");
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- } finally {
- ds.shutdownGracefully();
- }
+ ds.sync();
+ assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
+ assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String
tempDir, HoodieRecordType recordType) throws Exception {
@@ -851,35 +810,27 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- HoodieDeltaStreamer ds2 = null;
- try {
- ds.sync();
- // assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-
- // schedule a clustering job to build a clustering plan and transition
to inflight
- HoodieClusteringJob clusteringJob =
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
- clusteringJob.cluster(0);
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
- List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
- HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
-
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
-
- // do another ingestion with inline clustering enabled
- cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false",
"true", "2", "", ""));
- cfg.retryLastPendingInlineClusteringJob = true;
- ds2 = new HoodieDeltaStreamer(cfg, jsc);
- ds2.sync();
- String completeClusteringTimeStamp =
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
- assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
- TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
- } finally {
- ds.shutdownGracefully();
- if (ds2 != null) {
- ds2.shutdownGracefully();
- }
- }
+ ds.sync();
+ // assert ingest successful
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+ // schedule a clustering job to build a clustering plan and transition to
inflight
+ HoodieClusteringJob clusteringJob =
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
+ clusteringJob.cluster(0);
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
+ HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
+
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
+
+ // do another ingestion with inline clustering enabled
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ cfg.retryLastPendingInlineClusteringJob = true;
+ HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
+ ds2.sync();
+ String completeClusteringTimeStamp =
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
+ assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
}
@Test
@@ -902,32 +853,28 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// sync twice and trigger compaction
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
- prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null, dataGenerator, "001");
- deltaStreamer.sync();
- TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
- TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
-
- // delete compaction commit
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
- HoodieTimeline timeline =
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
- HoodieInstant commitInstant = timeline.lastInstant().get();
- String commitFileName = tableBasePath + "/.hoodie/" +
commitInstant.getFileName();
- fs.delete(new Path(commitFileName), false);
-
- // sync again
- prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false,
null, null, dataGenerator, "002");
- deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
- deltaStreamer.sync();
- TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
- meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
- timeline = meta.getActiveTimeline().getRollbackTimeline();
- assertEquals(1, timeline.getInstants().size());
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null, dataGenerator, "001");
+ deltaStreamer.sync();
+ TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+ TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+
+ // delete compaction commit
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTimeline timeline =
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ HoodieInstant commitInstant = timeline.lastInstant().get();
+ String commitFileName = tableBasePath + "/.hoodie/" +
commitInstant.getFileName();
+ fs.delete(new Path(commitFileName), false);
+
+ // sync again
+ prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false,
null, null, dataGenerator, "002");
+ deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+ meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ timeline = meta.getActiveTimeline().getRollbackTimeline();
+ assertEquals(1, timeline.getInstants().size());
}
@ParameterizedTest
@@ -949,104 +896,100 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- deltaStreamerTestRunner(ds, cfg, (r) -> {
- TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
- return true;
- });
-
- TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
+ deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+ return true;
+ });
- // Step 2 : Get the first replacecommit and extract the corresponding
replaced file IDs.
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
- HoodieTimeline replacedTimeline =
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
- Option<HoodieInstant> firstReplaceHoodieInstant =
replacedTimeline.nthFromLastInstant(1);
- assertTrue(firstReplaceHoodieInstant.isPresent());
-
- Option<byte[]> firstReplaceHoodieInstantDetails =
replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
- HoodieReplaceCommitMetadata firstReplaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(),
HoodieReplaceCommitMetadata.class);
- Map<String, List<String>> partitionToReplaceFileIds =
firstReplaceMetadata.getPartitionToReplaceFileIds();
- String partitionName = null;
- List replacedFileIDs = null;
- for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
- partitionName = String.valueOf(entry.getKey());
- replacedFileIDs = (List) entry.getValue();
- }
-
- assertNotNull(partitionName);
- assertNotNull(replacedFileIDs);
-
- // Step 3 : Based to replacedFileIDs , get the corresponding complete
path.
- ArrayList<String> replacedFilePaths = new ArrayList<>();
- Path partitionPath = new Path(meta.getBasePath(), partitionName);
- RemoteIterator<LocatedFileStatus> hoodieFiles =
meta.getFs().listFiles(partitionPath, true);
- while (hoodieFiles.hasNext()) {
- LocatedFileStatus f = hoodieFiles.next();
- String file = f.getPath().toUri().toString();
- for (Object replacedFileID : replacedFileIDs) {
- if (file.contains(String.valueOf(replacedFileID))) {
- replacedFilePaths.add(file);
- }
+ TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+
+ // Step 2 : Get the first replacecommit and extract the corresponding
replaced file IDs.
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTimeline replacedTimeline =
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
+ Option<HoodieInstant> firstReplaceHoodieInstant =
replacedTimeline.nthFromLastInstant(1);
+ assertTrue(firstReplaceHoodieInstant.isPresent());
+
+ Option<byte[]> firstReplaceHoodieInstantDetails =
replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
+ HoodieReplaceCommitMetadata firstReplaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(),
HoodieReplaceCommitMetadata.class);
+ Map<String, List<String>> partitionToReplaceFileIds =
firstReplaceMetadata.getPartitionToReplaceFileIds();
+ String partitionName = null;
+ List replacedFileIDs = null;
+ for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
+ partitionName = String.valueOf(entry.getKey());
+ replacedFileIDs = (List) entry.getValue();
+ }
+
+ assertNotNull(partitionName);
+ assertNotNull(replacedFileIDs);
+
+ // Step 3 : Based to replacedFileIDs , get the corresponding complete path.
+ ArrayList<String> replacedFilePaths = new ArrayList<>();
+ Path partitionPath = new Path(meta.getBasePath(), partitionName);
+ RemoteIterator<LocatedFileStatus> hoodieFiles =
meta.getFs().listFiles(partitionPath, true);
+ while (hoodieFiles.hasNext()) {
+ LocatedFileStatus f = hoodieFiles.next();
+ String file = f.getPath().toUri().toString();
+ for (Object replacedFileID : replacedFileIDs) {
+ if (file.contains(String.valueOf(replacedFileID))) {
+ replacedFilePaths.add(file);
}
}
+ }
- assertFalse(replacedFilePaths.isEmpty());
-
- // Step 4 : Add commits with insert of 1 record and trigger sync/async
cleaner and archive.
- List<String> configs = getTableServicesConfigs(1, "true", "true", "6",
"", "");
- configs.add(String.format("%s=%s",
HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
- configs.add(String.format("%s=%s",
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
- configs.add(String.format("%s=%s",
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
- configs.add(String.format("%s=%s",
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"));
- configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(),
asyncClean));
- configs.add(String.format("%s=%s",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
- configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(),
"DIRECT"));
- if (asyncClean) {
- configs.add(String.format("%s=%s",
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
- configs.add(String.format("%s=%s",
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
- HoodieFailedWritesCleaningPolicy.LAZY.name()));
- configs.add(String.format("%s=%s",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
- InProcessLockProvider.class.getName()));
- }
- addRecordMerger(recordType, configs);
- cfg.configs = configs;
- cfg.continuousMode = false;
- // timeline as of now. no cleaner and archival kicked in.
- // c1, c2, rc3, c4, c5, rc6,
-
- ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
- // after 1 round of sync, timeline will be as follows
- // just before clean
- // c1, c2, rc3, c4, c5, rc6, c7
- // after clean
- // c1, c2, rc3, c4, c5, rc6, c7, c8.clean (earliest commit to retain is
c7)
- // after archival (retain 4 commits)
- // c4, c5, rc6, c7, c8.clean
-
- // old code has 2 sync() calls. book-keeping the sequence for now.
- // after 2nd round of sync
- // just before clean
- // c4, c5, rc6, c7, c8.clean, c9
- // after clean
- // c4, c5, rc6, c7, c8.clean, c9, c10.clean (earliest commit to retain
c9)
- // after archival
- // c5, rc6, c7, c8.clean, c9, c10.clean
-
- // Step 5 : FirstReplaceHoodieInstant should not be retained.
- long count =
meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant
-> firstReplaceHoodieInstant.get().equals(instant)).count();
- assertEquals(0, count);
-
- // Step 6 : All the replaced files in firstReplaceHoodieInstant should
be deleted through sync/async cleaner.
- for (String replacedFilePath : replacedFilePaths) {
- assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
- }
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- } finally {
- ds.shutdownGracefully();
+ assertFalse(replacedFilePaths.isEmpty());
+
+ // Step 4 : Add commits with insert of 1 record and trigger sync/async
cleaner and archive.
+ List<String> configs = getTableServicesConfigs(1, "true", "true", "6", "",
"");
+ configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(),
"KEEP_LATEST_COMMITS"));
+ configs.add(String.format("%s=%s",
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
+ configs.add(String.format("%s=%s",
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
+ configs.add(String.format("%s=%s",
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"));
+ configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(),
asyncClean));
+ configs.add(String.format("%s=%s",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
+ configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(),
"DIRECT"));
+ if (asyncClean) {
+ configs.add(String.format("%s=%s",
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+ WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ configs.add(String.format("%s=%s",
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ HoodieFailedWritesCleaningPolicy.LAZY.name()));
+ configs.add(String.format("%s=%s",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+ InProcessLockProvider.class.getName()));
+ }
+ addRecordMerger(recordType, configs);
+ cfg.configs = configs;
+ cfg.continuousMode = false;
+ // timeline as of now. no cleaner and archival kicked in.
+ // c1, c2, rc3, c4, c5, rc6,
+
+ ds = new HoodieDeltaStreamer(cfg, jsc);
+ ds.sync();
+ // after 1 round of sync, timeline will be as follows
+ // just before clean
+ // c1, c2, rc3, c4, c5, rc6, c7
+ // after clean
+ // c1, c2, rc3, c4, c5, rc6, c7, c8.clean (earliest commit to retain is c7)
+ // after archival (retain 4 commits)
+ // c4, c5, rc6, c7, c8.clean
+
+ // old code has 2 sync() calls. book-keeping the sequence for now.
+ // after 2nd round of sync
+ // just before clean
+ // c4, c5, rc6, c7, c8.clean, c9
+ // after clean
+ // c4, c5, rc6, c7, c8.clean, c9, c10.clean (earliest commit to retain c9)
+ // after archival
+ // c5, rc6, c7, c8.clean, c9, c10.clean
+
+ // Step 5 : FirstReplaceHoodieInstant should not be retained.
+ long count =
meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant
-> firstReplaceHoodieInstant.get().equals(instant)).count();
+ assertEquals(0, count);
+
+ // Step 6 : All the replaced files in firstReplaceHoodieInstant should be
deleted through sync/async cleaner.
+ for (String replacedFilePath : replacedFilePaths) {
+ assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
}
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
private List<String> getAllMultiWriterConfigs() {
@@ -1284,47 +1227,39 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false",
"0", "false", "0"));
cfg.configs.addAll(getAllMultiWriterConfigs());
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- HoodieDeltaStreamer ds2 = null;
- try {
- ds.sync();
+ ds.sync();
- // assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+ // assert ingest successful
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
- // schedule a clustering job to build a clustering plan
- HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath,
null, false, "schedule");
- schedule.cluster(0);
+ // schedule a clustering job to build a clustering plan
+ HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath,
null, false, "schedule");
+ schedule.cluster(0);
- // do another ingestion
- ds2 = new HoodieDeltaStreamer(cfg, jsc);
- ds2.sync();
+ // do another ingestion
+ HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
+ ds2.sync();
- // convert clustering request into inflight, Simulate the last
clustering failed scenario
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
- List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
- HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
- HoodieInstant hoodieInflightInstant =
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
+ // convert clustering request into inflight, Simulate the last clustering
failed scenario
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
+ HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
+ HoodieInstant hoodieInflightInstant =
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
- // trigger a scheduleAndExecute clustering job
- // when retryFailedClustering true => will rollback and re-execute
failed clustering plan with same instant timestamp.
- // when retryFailedClustering false => will make and execute a new
clustering plan with new instant timestamp.
- HoodieClusteringJob scheduleAndExecute =
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute",
retryLastFailedClusteringJob, recordType);
- scheduleAndExecute.cluster(0);
+ // trigger a scheduleAndExecute clustering job
+ // when retryFailedClustering true => will rollback and re-execute failed
clustering plan with same instant timestamp.
+ // when retryFailedClustering false => will make and execute a new
clustering plan with new instant timestamp.
+ HoodieClusteringJob scheduleAndExecute =
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute",
retryLastFailedClusteringJob, recordType);
+ scheduleAndExecute.cluster(0);
- String completeClusteringTimeStamp =
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
+ String completeClusteringTimeStamp =
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
- if (retryLastFailedClusteringJob) {
- assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
- } else {
-
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
- }
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- } finally {
- ds.shutdownGracefully();
- if (ds2 != null) {
- ds2.shutdownGracefully();
- }
+ if (retryLastFailedClusteringJob) {
+ assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
+ } else {
+
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
}
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@ParameterizedTest
@@ -1550,77 +1485,51 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
// NOTE: We should not have need to set below config, 'datestr' should
have assumed date partitioning
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
- String lastInstantForUpstreamTable = null;
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs,
hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1000, tableBasePath, sqlContext);
- assertDistanceCount(1000, tableBasePath, sqlContext);
- assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
- lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000",
tableBasePath, fs, 1);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ assertDistanceCount(1000, tableBasePath, sqlContext);
+ assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
+ String lastInstantForUpstreamTable =
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// Now incrementally pull from the above hudi table and ingest to second
table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath,
downstreamTableBasePath, WriteOperationType.BULK_INSERT,
true, null);
addRecordMerger(recordType, downstreamCfg.configs);
- ds = new HoodieDeltaStreamer(downstreamCfg, jsc, fs,
hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1000, downstreamTableBasePath, sqlContext);
- assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
- assertDistanceCountWithExactValue(1000, downstreamTableBasePath,
sqlContext);
- TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 1);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(downstreamCfg, jsc, fs,
hiveServer.getHiveConf()).sync();
+ assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+ assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
+ assertDistanceCountWithExactValue(1000, downstreamTableBasePath,
sqlContext);
+ TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 1);
// No new data => no commits for upstream table
cfg.sourceLimit = 0;
- ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1000, tableBasePath, sqlContext);
- assertDistanceCount(1000, tableBasePath, sqlContext);
- assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ assertDistanceCount(1000, tableBasePath, sqlContext);
+ assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// with no change in upstream table, no change in downstream too when
pulled.
HoodieDeltaStreamer.Config downstreamCfg1 =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath,
downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true,
DummySchemaProvider.class.getName());
- ds = new HoodieDeltaStreamer(downstreamCfg1, jsc);
- try {
- ds.sync();
- assertRecordCount(1000, downstreamTableBasePath, sqlContext);
- assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
- assertDistanceCountWithExactValue(1000, downstreamTableBasePath,
sqlContext);
- TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 1);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
+ assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+ assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
+ assertDistanceCountWithExactValue(1000, downstreamTableBasePath,
sqlContext);
+ TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 1);
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
- ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1950, tableBasePath, sqlContext);
- assertDistanceCount(1950, tableBasePath, sqlContext);
- assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
- lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001",
tableBasePath, fs, 2);
- List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
- assertEquals(1950, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(1950, tableBasePath, sqlContext);
+ assertDistanceCount(1950, tableBasePath, sqlContext);
+ assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
+ lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001",
tableBasePath, fs, 2);
+ List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1950, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
// Incrementally pull changes in upstream hudi table and apply to
downstream table
downstreamCfg =
@@ -1628,19 +1537,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
false, null);
addRecordMerger(recordType, downstreamCfg.configs);
downstreamCfg.sourceLimit = 2000;
- ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
- try {
- ds.sync();
- assertRecordCount(2000, downstreamTableBasePath, sqlContext);
- assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
- assertDistanceCountWithExactValue(2000, downstreamTableBasePath,
sqlContext);
- String finalInstant =
- TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 2);
- List<Row> counts = countsPerCommit(downstreamTableBasePath, sqlContext);
- assertEquals(2000, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+ assertRecordCount(2000, downstreamTableBasePath, sqlContext);
+ assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
+ assertDistanceCountWithExactValue(2000, downstreamTableBasePath,
sqlContext);
+ String finalInstant =
+ TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 2);
+ counts = countsPerCommit(downstreamTableBasePath, sqlContext);
+ assertEquals(2000, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
// Test Hive integration
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath,
"hive_trips");
@@ -1677,27 +1581,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, "MERGE_ON_READ");
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs,
hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1000, dataSetBasePath, sqlContext);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(1000, dataSetBasePath, sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
- try {
- ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
- //now assert that hoodie.properties file now has updated payload class
name
- HoodieTableMetaClient metaClient = createMetaClient(jsc,
dataSetBasePath, false);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
DummyAvroPayload.class.getName());
- } finally {
- ds.shutdownGracefully();
- }
+ //now assert that hoodie.properties file now has updated payload class name
+ HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath,
false);
+ assertEquals(metaClient.getTableConfig().getPayloadClass(),
DummyAvroPayload.class.getName());
}
@Test
@@ -1706,17 +1601,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, PartialUpdateAvroPayload.class.getName(),
"MERGE_ON_READ");
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs,
hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1000, dataSetBasePath, sqlContext);
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(1000, dataSetBasePath, sqlContext);
- //now assert that hoodie.properties file now has updated payload class
name
- HoodieTableMetaClient metaClient = createMetaClient(jsc,
dataSetBasePath, false);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
PartialUpdateAvroPayload.class.getName());
- } finally {
- ds.shutdownGracefully();
- }
+ //now assert that hoodie.properties file now has updated payload class name
+ HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath,
false);
+ assertEquals(metaClient.getTableConfig().getPayloadClass(),
PartialUpdateAvroPayload.class.getName());
}
@Test
@@ -1725,33 +1615,24 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, null);
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs,
hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(1000, dataSetBasePath, sqlContext);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(1000, dataSetBasePath, sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, DummyAvroPayload.class.getName(), null);
- ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-
- try {
- //now assert that hoodie.properties file does not have payload class
prop since it is a COW table
- Properties props = new Properties();
- String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
- FileSystem fs = FSUtils.getFs(cfg.targetBasePath,
jsc.hadoopConfiguration());
- try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
- props.load(inputStream);
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-
assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
- } finally {
- ds.shutdownGracefully();
+ //now assert that hoodie.properties file does not have payload class prop
since it is a COW table
+ Properties props = new Properties();
+ String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
+ FileSystem fs = FSUtils.getFs(cfg.targetBasePath,
jsc.hadoopConfiguration());
+ try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
+ props.load(inputStream);
}
+
+ assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
}
@ParameterizedTest
@@ -1762,12 +1643,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
addRecordMerger(recordType, cfg.configs);
- HoodieDeltaStreamer ds1 = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds1.sync();
- } finally {
- ds1.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
@@ -1775,12 +1651,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.INSERT;
- HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds2.sync();
- } finally {
- ds2.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
assertRecordCount(2000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
// 1000 records for commit 00000 & 1000 for commit 00001
@@ -1797,12 +1668,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg2.sourceLimit = 2000;
cfg2.operation = WriteOperationType.UPSERT;
cfg2.configs.add(String.format("%s=false",
HoodieCleanConfig.AUTO_CLEAN.key()));
- HoodieDeltaStreamer ds3 = new HoodieDeltaStreamer(cfg2, jsc);
- try {
- ds3.sync();
- } finally {
- ds3.shutdownGracefully();
- }
+ HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
+ ds2.sync();
mClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
HoodieInstant newLastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(),
HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
@@ -1817,13 +1684,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// Try UPSERT with filterDupes true. Expect exception
cfg2.filterDupes = true;
cfg2.operation = WriteOperationType.UPSERT;
- HoodieDeltaStreamer ds4 = new HoodieDeltaStreamer(cfg2, jsc);
try {
- ds4.sync();
+ new HoodieDeltaStreamer(cfg2, jsc).sync();
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be
disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
- } finally {
- ds4.shutdownGracefully();
}
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@@ -1876,45 +1740,42 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null);
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- }
- HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
- try {
- if (testEmptyBatch) {
- prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null);
- prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc",
- PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false,
"partition_path", "0");
- deltaStreamer1.sync();
- // since we mimic'ed empty batch, total records should be same as
first sync().
- assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-
- // validate table schema fetches valid schema from last but one commit.
- TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
- assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
- // schema from latest commit and last but one commit should match
- compareLatestTwoSchemas(metaClient);
- prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
- PARQUET_SOURCE_ROOT, false, "partition_path", "");
- }
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
- // proceed w/ non empty batch.
- prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false,
null, null);
- deltaStreamer.sync();
- assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
- // validate commit metadata for all completed commits to have valid
schema in extra metadata.
+ if (testEmptyBatch) {
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+ HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamer1.sync();
+ // since we mimic'ed empty batch, total records should be same as first
sync().
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants()
- .forEach(entry ->
assertValidSchemaAndOperationTypeInCommitMetadata(
- entry, metaClient, WriteOperationType.INSERT));
- testNum++;
- } finally {
+
+ // validate table schema fetches valid schema from last but one commit.
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
+ // schema from latest commit and last but one commit should match
+ compareLatestTwoSchemas(metaClient);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
deltaStreamer1.shutdownGracefully();
}
+
+ // proceed w/ non empty batch.
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null,
null);
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
+ // validate commit metadata for all completed commits to have valid schema
in extra metadata.
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+ metaClient.reloadActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants().getInstants()
+ .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
+ entry, metaClient, WriteOperationType.INSERT));
+ testNum++;
+ deltaStreamer.shutdownGracefully();
}
private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant
instant,
@@ -1961,13 +1822,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null),
jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
- testNum++;
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
+ testNum++;
}
private void prepareJsonKafkaDFSSource(String propsFileName, String
autoResetValue, String topicName) throws IOException {
@@ -2015,12 +1872,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
true, 100000, false, null, null, "timestamp", null), jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(parquetRecords, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecords, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
// prep json kafka source
topicName = "topic" + testNum;
@@ -2031,21 +1885,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
- try {
- deltaStreamer.sync();
- // if auto reset value is set to LATEST, this all kafka records so far
may not be synced.
- int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 :
JSON_KAFKA_NUM_RECORDS);
- assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
+ deltaStreamer.sync();
+ // if auto reset value is set to LATEST, this all kafka records so far may
not be synced.
+ int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 :
JSON_KAFKA_NUM_RECORDS);
+ assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
- // verify 2nd batch to test LATEST auto reset value.
- prepareJsonKafkaDFSFiles(20, false, topicName);
- totalExpectedRecords += 20;
- deltaStreamer.sync();
- assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
- testNum++;
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ // verify 2nd batch to test LATEST auto reset value.
+ prepareJsonKafkaDFSFiles(20, false, topicName);
+ totalExpectedRecords += 20;
+ deltaStreamer.sync();
+ assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
+ testNum++;
}
@Test
@@ -2058,20 +1908,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
- try {
- deltaStreamer.sync();
- assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
-
- int totalRecords = JSON_KAFKA_NUM_RECORDS;
- int records = 10;
- totalRecords += records;
- prepareJsonKafkaDFSFiles(records, false, topicName);
- deltaStreamer.sync();
- assertRecordCount(totalRecords, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ int totalRecords = JSON_KAFKA_NUM_RECORDS;
+ int records = 10;
+ totalRecords += records;
+ prepareJsonKafkaDFSFiles(records, false, topicName);
+ deltaStreamer.sync();
+ assertRecordCount(totalRecords, tableBasePath, sqlContext);
}
@Test
@@ -2088,31 +1933,27 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
- try {
- deltaStreamer.sync();
- sqlContext.clearCache();
- Dataset<Row> ds =
sqlContext.read().format("org.apache.hudi").load(tableBasePath);
- assertEquals(numRecords, ds.count());
- //ensure that kafka partition column exists and is populated correctly
- for (int i = 0; i < numPartitions; i++) {
- assertEquals(recsPerPartition, ds.filter(KAFKA_SOURCE_PARTITION_COLUMN
+ "=" + i).count());
- }
+ deltaStreamer.sync();
+ sqlContext.clearCache();
+ Dataset<Row> ds =
sqlContext.read().format("org.apache.hudi").load(tableBasePath);
+ assertEquals(numRecords, ds.count());
+ //ensure that kafka partition column exists and is populated correctly
+ for (int i = 0; i < numPartitions; i++) {
+ assertEquals(recsPerPartition, ds.filter(KAFKA_SOURCE_PARTITION_COLUMN +
"=" + i).count());
+ }
- //ensure that kafka timestamp column exists and is populated correctly
- long afterTime = Instant.now().toEpochMilli();
- assertEquals(numRecords, ds.filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + ">" +
beforeTime).filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + "<" + afterTime).count());
+ //ensure that kafka timestamp column exists and is populated correctly
+ long afterTime = Instant.now().toEpochMilli();
+ assertEquals(numRecords, ds.filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + ">" +
beforeTime).filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + "<" + afterTime).count());
- //ensure that kafka offset column exists and is populated correctly
-
sqlContext.read().format("org.apache.hudi").load(tableBasePath).col(KAFKA_SOURCE_OFFSET_COLUMN);
- for (int i = 0; i < recsPerPartition; i++) {
- for (int j = 0; j < numPartitions; j++) {
- //each offset partition pair should be unique
- assertEquals(1, ds.filter(KAFKA_SOURCE_OFFSET_COLUMN + "=" +
i).filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + j).count());
- }
+ //ensure that kafka offset column exists and is populated correctly
+
sqlContext.read().format("org.apache.hudi").load(tableBasePath).col(KAFKA_SOURCE_OFFSET_COLUMN);
+ for (int i = 0; i < recsPerPartition; i++) {
+ for (int j = 0; j < numPartitions; j++) {
+ //each offset partition pair should be unique
+ assertEquals(1, ds.filter(KAFKA_SOURCE_OFFSET_COLUMN + "=" +
i).filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + j).count());
}
- } finally {
- deltaStreamer.shutdownGracefully();
}
}
@@ -2128,22 +1969,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null,
null, "timestamp", String.valueOf(System.currentTimeMillis())),
jsc);
- try {
- deltaStreamer.sync();
- deltaStreamer.shutdownGracefully();
- assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
+ deltaStreamer.sync();
+ assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
- prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
- deltaStreamer = new HoodieDeltaStreamer(
- TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
- Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
- true, 100000, false, null, null,
- "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
- deltaStreamer.sync();
- assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
+ deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ true, 100000, false, null, null,
+ "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
+ deltaStreamer.sync();
+ assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
}
@Disabled("HUDI-6609")
@@ -2262,40 +2098,36 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() :
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null),
jsc);
- try {
- deltaStreamer.sync();
+ deltaStreamer.sync();
- if (testInitFailure) {
- FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath +
"/.hoodie/"));
- Arrays.stream(fileStatuses).filter(entry ->
entry.getPath().getName().contains("commit") ||
entry.getPath().getName().contains("inflight")).forEach(entry -> {
- try {
- fs.delete(entry.getPath());
- } catch (IOException e) {
- LOG.warn("Failed to delete " + entry.getPath().toString(), e);
- }
- });
- }
- // delete hoodie.properties
- fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
-
- // restart the pipeline.
- if (testInitFailure) { // should succeed.
- deltaStreamer = new HoodieDeltaStreamer(
- TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
- null, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp",
null), jsc);
- deltaStreamer.sync();
- assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
- } else {
- assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
- TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
- null, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp",
null), jsc));
- }
- } finally {
- deltaStreamer.shutdownGracefully();
- testNum++;
+ if (testInitFailure) {
+ FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath +
"/.hoodie/"));
+ Arrays.stream(fileStatuses).filter(entry ->
entry.getPath().getName().contains("commit") ||
entry.getPath().getName().contains("inflight")).forEach(entry -> {
+ try {
+ fs.delete(entry.getPath());
+ } catch (IOException e) {
+ LOG.warn("Failed to delete " + entry.getPath().toString(), e);
+ }
+ });
+ }
+ // delete hoodie.properties
+ fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
+
+ // restart the pipeline.
+ if (testInitFailure) { // should succeed.
+ deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp",
null), jsc);
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ } else {
+ assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp",
null), jsc));
}
+ testNum++;
}
@Test
@@ -2381,13 +2213,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
tableBasePath, WriteOperationType.INSERT,
CsvDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
useSchemaProvider, 1000, false, null, null, sourceOrderingField,
null), jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- testNum++;
- }
+ deltaStreamer.sync();
+ assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
+ testNum++;
}
@Test
@@ -2505,12 +2333,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
tableBasePath, WriteOperationType.INSERT,
SqlSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false,
false, 1000, false, null, null, "timestamp", null, true), jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
}
@Disabled
@@ -2562,12 +2386,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath,
downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, null);
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1");
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
- try {
- ds.sync();
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
//No change as this fails with Path not exist error
@@ -2584,18 +2403,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
//Adding this conf to make testing easier :)
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
downstreamCfg.operation = WriteOperationType.UPSERT;
- ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
- try {
- ds.sync();
- } finally {
- ds.shutdownGracefully();
- }
- ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
- try {
- ds.sync();
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+ new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
long baseTableRecords =
sqlContext.read().format("org.apache.hudi").load(tableBasePath).count();
long downStreamTableRecords =
sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath).count();
@@ -2612,6 +2421,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.keep.min.commits=4");
cfg.configs.add("hoodie.keep.max.commits=5");
cfg.configs.add("hoodie.test.source.generate.inserts=true");
+
for (int i = 0; i < count; i++) {
new HoodieDeltaStreamer(cfg, jsc).sync();
}
@@ -2643,13 +2453,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null), jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- testNum++;
- }
+ deltaStreamer.sync();
+ assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
+ testNum++;
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareParquetDFSSource(false, false);
@@ -2658,16 +2464,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.makeConfig(tableBasePath,
WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(),
Collections.singletonList(TestSpecificPartitionTransformer.class.getName()),
PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null), jsc);
- try {
- deltaStreamer.sync();
- // No records should match the
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
- assertNoPartitionMatch(tableBasePath, sqlContext,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ deltaStreamer.sync();
+ // No records should match the
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
+ assertNoPartitionMatch(tableBasePath, sqlContext,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
- // There should not be any fileIDs in the deleted partition
- assertTrue(getAllFileIDsInTable(tableBasePath,
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ // There should not be any fileIDs in the deleted partition
+ assertTrue(getAllFileIDsInTable(tableBasePath,
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
}
@Test
@@ -2691,15 +2493,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// Initial insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
addRecordMerger(recordType, cfg.configs);
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- assertRecordCount(1000, tableBasePath, sqlContext);
- assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ assertDistanceCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// Collect the fileIds before running HoodieDeltaStreamer
Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath,
Option.empty());
@@ -2708,40 +2505,29 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.operation = operationType;
// No new data => no commits.
cfg.sourceLimit = 0;
- ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
-
- if (operationType == WriteOperationType.INSERT_OVERWRITE) {
- assertRecordCount(1000, tableBasePath, sqlContext);
- assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
- } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
- final HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
- assertEquals(0, fsView.getLatestFileSlices("").count());
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-
- // Since the table has been overwritten all fileIDs before should have
been replaced
- Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath,
Option.empty());
- assertTrue(afterFileIDs.isEmpty());
- }
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ if (operationType == WriteOperationType.INSERT_OVERWRITE) {
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ assertDistanceCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
+ final HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
+ assertEquals(0, fsView.getLatestFileSlices("").count());
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
- cfg.sourceLimit = 1000;
- ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- assertRecordCount(950, tableBasePath, sqlContext);
- assertDistanceCount(950, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- } finally {
- ds.shutdownGracefully();
+ // Since the table has been overwritten all fileIDs before should have
been replaced
+ Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath,
Option.empty());
+ assertTrue(afterFileIDs.isEmpty());
}
+
+ cfg.sourceLimit = 1000;
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertRecordCount(950, tableBasePath, sqlContext);
+ assertDistanceCount(950, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@Test
@@ -2786,24 +2572,20 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.configs.add(String.format("%s=%s",
HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds.sync();
- // assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-
- TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
-
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
- // get schema from data file written in the latest commit
- Schema tableSchema =
tableSchemaResolver.getTableAvroSchemaFromDataFile();
- assertNotNull(tableSchema);
-
- List<String> tableFields =
tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
- // now assert that the partition column is not in the target schema
- assertFalse(tableFields.contains("partition_path"));
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- } finally {
- ds.shutdownGracefully();
- }
+ ds.sync();
+ // assert ingest successful
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+ TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
+
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
+ // get schema from data file written in the latest commit
+ Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
+ assertNotNull(tableSchema);
+
+ List<String> tableFields =
tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ // now assert that the partition column is not in the target schema
+ assertFalse(tableFields.contains("partition_path"));
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@Test
@@ -2816,20 +2598,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.enableMetaSync = true;
cfg.forceEmptyMetaSync = true;
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs,
hiveServer.getHiveConf());
- try {
- ds.sync();
- assertRecordCount(0, tableBasePath, sqlContext);
-
- // make sure hive table is present
- HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath,
"hive_trips");
- hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
- HoodieHiveSyncClient hiveClient = new
HoodieHiveSyncClient(hiveSyncConfig);
- final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
- assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + "
should exist");
- } finally {
- ds.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertRecordCount(0, tableBasePath, sqlContext);
+
+ // make sure hive table is present
+ HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath,
"hive_trips");
+ hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
+ HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
+ final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
+ assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + "
should exist");
}
@Test
@@ -2837,15 +2614,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
String tableBasePath = basePath +
"/test_resume_checkpoint_after_changing_cow_to_mor";
// default table type is COW
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
- HoodieDeltaStreamer ds1 = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds1.sync();
- assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
- } finally {
- ds1.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// change cow to mor
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
@@ -2864,39 +2636,29 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// continue deltastreamer
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds2.sync();
- // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
- assertRecordCount(1450, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
- List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
- assertEquals(1450, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
- // currently there should be 1 deltacommits now
- TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
- } finally {
- ds2.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
+ assertRecordCount(1450, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+ List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1450, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+ // currently there should be 1 deltacommits now
+ TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
// test the table type is already mor
- HoodieDeltaStreamer ds3 = new HoodieDeltaStreamer(cfg, jsc);
- try {
- ds3.sync();
- // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
- // total records should be 1900 now
- assertRecordCount(1900, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
- List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
- assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
- // currently there should be 2 deltacommits now
- TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
- } finally {
- // clean up
- ds3.shutdownGracefully();
- }
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
+ // total records should be 1900 now
+ assertRecordCount(1900, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+ counts = countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+ // currently there should be 2 deltacommits now
+ TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+ // clean up
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@@ -2984,21 +2746,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null);
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
- try {
- deltaStreamer.sync();
- assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null,
null);
- try {
- deltaStreamer.sync();
- assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
- testNum++;
- } finally {
- deltaStreamer.shutdownGracefully();
- }
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
+ testNum++;
}
@ParameterizedTest