This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 456f6731cc4fb29abbc3c9fbd51a9c798efab310 Author: Lokesh Jain <[email protected]> AuthorDate: Tue Sep 12 02:04:24 2023 +0530 [HUDI-6753] Fix parquet inline reading flaky test (#9618) --- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 269 +++++++++++- .../deltastreamer/TestHoodieDeltaStreamer.java | 472 +++++---------------- .../TestHoodieDeltaStreamerDAGExecution.java | 4 +- .../TestHoodieDeltaStreamerWithMultiWriter.java | 127 +++--- .../TestHoodieMultiTableDeltaStreamer.java | 12 +- .../utilities/deltastreamer/TestTransformer.java | 4 +- .../utilities/testutils/UtilitiesTestBase.java | 3 +- 7 files changed, 462 insertions(+), 429 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 3c5b45b35c1..b117b2001fa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -34,6 +35,7 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.utilities.config.SourceTestConfig; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -41,18 +43,27 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static org.apache.hudi.common.util.StringUtils.nonEmpty; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; @@ -62,9 +73,14 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NA import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { + private static final Logger LOG = LoggerFactory.getLogger(HoodieDeltaStreamerTestBase.class); + static final Random RANDOM = new Random(); static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties"; @@ -111,6 +127,8 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); protected static int testNum = 1; + Map<String, String> hudiOpts = new HashMap<>(); + protected static void prepareTestSetup() throws IOException { PARQUET_SOURCE_ROOT = basePath + "/parquetFiles"; ORC_SOURCE_ROOT = basePath + "/orcFiles"; @@ -230,8 +248,9 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { } @BeforeEach - public void resetTestDataSource() { + public void setupTest() { TestDataSource.returnEmptyBatch = false; + hudiOpts = new HashMap<>(); } protected static void populateInvalidTableConfigFilePathProps(TypedProperties props, String dfsBasePath) { @@ -431,4 +450,252 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } + void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + long recordCount = sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).count(); + assertEquals(expected, recordCount); + } + + void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + long recordCount = sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count(); + assertEquals(expected, recordCount); + } + + List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + List<Row> rows = sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath) + .groupBy("_hoodie_commit_time").count() + .sort("_hoodie_commit_time").collectAsList(); + return rows; + } + + void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); + long recordCount = + sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count(); + assertEquals(expected, recordCount); + } + + void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); + long recordCount = + sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count(); + assertEquals(expected, recordCount); + } + + Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) { + sqlContext.clearCache(); + List<Row> rows = sqlContext.read().options(hudiOpts).format("org.apache.hudi") + .load(basePath) + .groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + .count() + .collectAsList(); + Map<String, Long> partitionRecordCount = new HashMap<>(); + rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1))); + return partitionRecordCount; + } + + void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) { + sqlContext.clearCache(); + assertEquals(0, sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(basePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate) + .count()); + } + + static class TestHelpers { + + static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) { + return makeConfig(basePath, op, Collections.singletonList(TestHoodieDeltaStreamer.DropAllTransformer.class.getName())); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) { + return makeConfig(basePath, op, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) { + return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, + String propsFilename, boolean enableHiveSync) { + return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, + false, null, null); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, + String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, + String payloadClassName, String tableType) { + return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, + useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, + List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, + String checkpoint) { + return makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName, + tableType, sourceOrderingField, checkpoint, false); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, + List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, + String checkpoint, boolean allowCommitOnNoCheckpointChange) { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + cfg.targetBasePath = basePath; + cfg.targetTableName = "hoodie_trips"; + cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType; + cfg.sourceClassName = sourceClassName; + cfg.transformerClassNames = transformerClassNames; + cfg.operation = op; + cfg.enableHiveSync = enableHiveSync; + cfg.sourceOrderingField = sourceOrderingField; + cfg.propsFilePath = UtilitiesTestBase.basePath + "/" + propsFilename; + cfg.sourceLimit = sourceLimit; + cfg.checkpoint = checkpoint; + if (updatePayloadClass) { + cfg.payloadClassName = payloadClassName; + } + if (useSchemaProviderClass) { + cfg.schemaProviderClassName = defaultSchemaProviderClassName; + } + cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange; + return cfg; + } + + static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op, + boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + cfg.targetBasePath = basePath; + cfg.targetTableName = "hoodie_trips_copy"; + cfg.tableType = "COPY_ON_WRITE"; + cfg.sourceClassName = HoodieIncrSource.class.getName(); + cfg.operation = op; + cfg.sourceOrderingField = "timestamp"; + cfg.propsFilePath = UtilitiesTestBase.basePath + "/test-downstream-source.properties"; + cfg.sourceLimit = 1000; + if (null != schemaProviderClassName) { + cfg.schemaProviderClassName = schemaProviderClassName; + } + List<String> cfgs = new ArrayList<>(); + cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); + cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath); + // No partition + cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr"); + cfg.configs = cfgs; + return cfg; + } + + static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numCompactionCommits = timeline.countInstants(); + assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); + } + + static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numDeltaCommits = timeline.countInstants(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } + + static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numCompactionCommits = timeline.countInstants(); + assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); + } + + static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); + HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numDeltaCommits = timeline.countInstants(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } + + static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits) + throws IOException { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieInstant lastInstant = timeline.lastInstant().get(); + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class); + assertEquals(totalCommits, timeline.countInstants()); + assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY)); + return lastInstant.getTimestamp(); + } + + static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFuture, long timeoutInSecs) throws Exception { + Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> { + boolean ret = false; + while (!ret && !dsFuture.isDone()) { + try { + Thread.sleep(3000); + ret = condition.apply(true); + } catch (Throwable error) { + LOG.warn("Got error :", error); + ret = false; + } + } + return ret; + }); + res.get(timeoutInSecs, TimeUnit.SECONDS); + } + + static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numDeltaCommits = timeline.countInstants(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } + + static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numDeltaCommits = timeline.countInstants(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } + + static void assertPendingIndexCommit(String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numIndexCommits = timeline.countInstants(); + assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); + } + + static void assertCompletedIndexCommit(String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numIndexCommits = timeline.countInstants(); + assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); + } + + static void assertNoReplaceCommits(String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numDeltaCommits = timeline.countInstants(); + assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0); + } + + static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().filterPendingReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); + int numDeltaCommits = timeline.countInstants(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 2a7db25647e..32af50eee64 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -77,7 +77,6 @@ import org.apache.hudi.utilities.config.SourceTestConfig; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.CsvDFSSource; -import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JdbcSource; import org.apache.hudi.utilities.sources.JsonKafkaSource; @@ -111,7 +110,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF4; import org.apache.spark.sql.functions; @@ -183,6 +181,18 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamer.class); + private void addRecordMerger(HoodieRecordType type, List<String> hoodieConfig) { + if (type == HoodieRecordType.SPARK) { + Map<String, String> opts = new HashMap<>(); + opts.put(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieSparkRecordMerger.class.getName()); + opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet"); + for (Map.Entry<String, String> entry : opts.entrySet()) { + hoodieConfig.add(String.format("%s=%s", entry.getKey(), entry.getValue())); + } + hudiOpts.putAll(opts); + } + } + protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType recordType) throws IOException { return initialHoodieDeltaStreamer(tableBasePath, totalRecords, asyncCluster, recordType, WriteOperationType.INSERT); } @@ -195,7 +205,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType recordType, WriteOperationType writeOperationType, Set<String> customConfigs) throws IOException { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, writeOperationType); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", asyncCluster, "")); @@ -216,261 +226,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Boolean retryLastFailedClusteringJob, HoodieRecordType recordType) { HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob); - TestHelpers.addRecordMerger(recordType, scheduleClusteringConfig.configs); + addRecordMerger(recordType, scheduleClusteringConfig.configs); scheduleClusteringConfig.configs.addAll(getAllMultiWriterConfigs()); return new HoodieClusteringJob(jsc, scheduleClusteringConfig); } - static class TestHelpers { - - static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) { - return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName())); - } - - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) { - return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); - } - - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) { - return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false); - } - - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, - String propsFilename, boolean enableHiveSync) { - return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, - false, null, null); - } - - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, - String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, - String payloadClassName, String tableType) { - return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, - useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null); - } - - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, - List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, - int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, - String checkpoint) { - return makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName, - tableType, sourceOrderingField, checkpoint, false); - } - - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, - List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, - int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, - String checkpoint, boolean allowCommitOnNoCheckpointChange) { - HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); - cfg.targetBasePath = basePath; - cfg.targetTableName = "hoodie_trips"; - cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType; - cfg.sourceClassName = sourceClassName; - cfg.transformerClassNames = transformerClassNames; - cfg.operation = op; - cfg.enableHiveSync = enableHiveSync; - cfg.sourceOrderingField = sourceOrderingField; - cfg.propsFilePath = UtilitiesTestBase.basePath + "/" + propsFilename; - cfg.sourceLimit = sourceLimit; - cfg.checkpoint = checkpoint; - if (updatePayloadClass) { - cfg.payloadClassName = payloadClassName; - } - if (useSchemaProviderClass) { - cfg.schemaProviderClassName = defaultSchemaProviderClassName; - } - cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange; - return cfg; - } - - static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op, - boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { - HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); - cfg.targetBasePath = basePath; - cfg.targetTableName = "hoodie_trips_copy"; - cfg.tableType = "COPY_ON_WRITE"; - cfg.sourceClassName = HoodieIncrSource.class.getName(); - cfg.operation = op; - cfg.sourceOrderingField = "timestamp"; - cfg.propsFilePath = UtilitiesTestBase.basePath + "/test-downstream-source.properties"; - cfg.sourceLimit = 1000; - if (null != schemaProviderClassName) { - cfg.schemaProviderClassName = schemaProviderClassName; - } - List<String> cfgs = new ArrayList<>(); - cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); - cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath); - // No partition - cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr"); - cfg.configs = cfgs; - return cfg; - } - - static void addRecordMerger(HoodieRecordType type, List<String> hoodieConfig) { - if (type == HoodieRecordType.SPARK) { - hoodieConfig.add(String.format("%s=%s", HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieSparkRecordMerger.class.getName())); - hoodieConfig.add(String.format("%s=%s", HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet")); - } - } - - static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { - sqlContext.clearCache(); - long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count(); - assertEquals(expected, recordCount); - } - - static Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) { - sqlContext.clearCache(); - List<Row> rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList(); - Map<String, Long> partitionRecordCount = new HashMap<>(); - rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1))); - return partitionRecordCount; - } - - static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) { - sqlContext.clearCache(); - assertEquals(0, sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count()); - } - - static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) { - sqlContext.clearCache(); - long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count(); - assertEquals(expected, recordCount); - } - - static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) { - sqlContext.clearCache(); - List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath) - .groupBy("_hoodie_commit_time").count() - .sort("_hoodie_commit_time").collectAsList(); - return rows; - } - - static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) { - sqlContext.clearCache(); - sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); - long recordCount = - sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count(); - assertEquals(expected, recordCount); - } - - static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) { - sqlContext.clearCache(); - sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); - long recordCount = - sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count(); - assertEquals(expected, recordCount); - } - - static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numCompactionCommits = timeline.countInstants(); - assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); - } - - static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numDeltaCommits = timeline.countInstants(); - assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); - } - - static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numCompactionCommits = timeline.countInstants(); - assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); - } - - static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numDeltaCommits = timeline.countInstants(); - assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); - } - - static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits) - throws IOException { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - HoodieInstant lastInstant = timeline.lastInstant().get(); - HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class); - assertEquals(totalCommits, timeline.countInstants()); - assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY)); - return lastInstant.getTimestamp(); - } - - static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFuture, long timeoutInSecs) throws Exception { - Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> { - boolean ret = false; - while (!ret && !dsFuture.isDone()) { - try { - Thread.sleep(3000); - ret = condition.apply(true); - } catch (Throwable error) { - LOG.warn("Got error :", error); - ret = false; - } - } - return ret; - }); - res.get(timeoutInSecs, TimeUnit.SECONDS); - } - - static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numDeltaCommits = timeline.countInstants(); - assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); - } - - static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numDeltaCommits = timeline.countInstants(); - assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); - } - - static void assertPendingIndexCommit(String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numIndexCommits = timeline.countInstants(); - assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); - } - - static void assertCompletedIndexCommit(String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numIndexCommits = timeline.countInstants(); - assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); - } - - static void assertNoReplaceCommits(String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numDeltaCommits = timeline.countInstants(); - assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0); - } - - static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); - HoodieTimeline timeline = meta.getActiveTimeline().filterPendingReplaceTimeline(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); - int numDeltaCommits = timeline.countInstants(); - assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); - } - } - @Test public void testProps() { TypedProperties props = @@ -696,7 +456,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); // No new data => no commits. @@ -707,7 +467,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.sourceLimit = 2000; cfg.operation = WriteOperationType.UPSERT; syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2); - List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); // Perform bootstrap with tableBasePath as source @@ -732,7 +492,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { LOG.info("Schema :"); res.printSchema(); - TestHelpers.assertRecordCount(1950, newDatasetBasePath, sqlContext); + assertRecordCount(1950, newDatasetBasePath, sqlContext); res.registerTempTable("bootstrapped"); assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count()); // NOTE: To fetch record's count Spark will optimize the query fetching minimal possible amount @@ -767,7 +527,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.operation = WriteOperationType.UPSERT; cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval"); assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1)); - List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -776,14 +536,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { newCfg.sourceLimit = 2000; newCfg.operation = WriteOperationType.UPSERT; syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2); - List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts2 = countsPerCommit(tableBasePath, sqlContext); assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum()); } private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception { new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext); + assertRecordCount(expected, tableBasePath, sqlContext); + assertDistanceCount(expected, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, totalCommits); } @@ -796,7 +556,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Insert data produced with Schema A, pass Schema A HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); @@ -804,13 +564,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); } new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); // Upsert data produced with Schema B, pass Schema B cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); @@ -819,9 +579,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. - TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext); + assertRecordCount(1450, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); - List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); sqlContext.read().format("org.apache.hudi").load(tableBasePath).createOrReplaceTempView("tmp_trips"); @@ -835,7 +595,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); if (useUserProvidedSchema) { cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); @@ -846,9 +606,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); new HoodieDeltaStreamer(cfg, jsc).sync(); // again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. - TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext); + assertRecordCount(1900, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3); - counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build()); @@ -882,14 +642,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecordType recordType) throws Exception { String tableBasePath = basePath + "/non_continuous_cow"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); cfg.continuousMode = false; HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync(); - TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown"); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -913,14 +673,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecordType recordType) throws Exception { String tableBasePath = basePath + "/non_continuous_mor"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); cfg.continuousMode = false; HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync(); - TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown"); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -935,7 +695,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; if (testShutdownGracefully) { cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); @@ -951,8 +711,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } else { TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs); } - TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext); + assertRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistanceCount(totalRecords, tableBasePath, sqlContext); if (testShutdownGracefully) { TestDataSource.returnEmptyBatch = true; } @@ -1019,7 +779,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); @@ -1085,7 +845,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // sync twice and trigger compaction HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null, dataGenerator, "001"); deltaStreamer.sync(); TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs); @@ -1118,7 +878,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Step 1 : Prepare and insert data without archival and cleaner. // Make sure that there are 6 commits including 2 replacecommits completed. HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); @@ -1186,7 +946,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); } - TestHelpers.addRecordMerger(recordType, configs); + addRecordMerger(recordType, configs); cfg.configs = configs; cfg.continuousMode = false; // timeline as of now. no cleaner and archival kicked in. @@ -1361,7 +1121,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3")); @@ -1373,7 +1133,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // There should be 4 commits, one of which should be a replace commit TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); - TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -1392,7 +1152,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3")); @@ -1404,7 +1164,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // There should be 4 commits, one of which should be a replace commit TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); - TestHelpers.assertDistinctRecordCount(1900, tableBasePath, sqlContext); + assertDistinctRecordCount(1900, tableBasePath, sqlContext); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -1418,7 +1178,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3")); @@ -1431,7 +1191,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // There should be 4 commits, one of which should be a replace commit TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); - TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -1443,7 +1203,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // ingest data int totalRecords = 3000; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = false; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0")); @@ -1548,32 +1308,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert to ingest to first hudi table HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); // NOTE: We should not have need to set below config, 'datestr' should have assumed date partitioning cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day"); new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); + assertDistanceCount(1000, tableBasePath, sqlContext); + assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext); String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); // Now incrementally pull from the above hudi table and ingest to second table HoodieDeltaStreamer.Config downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null); - TestHelpers.addRecordMerger(recordType, downstreamCfg.configs); + addRecordMerger(recordType, downstreamCfg.configs); new HoodieDeltaStreamer(downstreamCfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext); - TestHelpers.assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext); + assertRecordCount(1000, downstreamTableBasePath, sqlContext); + assertDistanceCount(1000, downstreamTableBasePath, sqlContext); + assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext); TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 1); // No new data => no commits for upstream table cfg.sourceLimit = 0; new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); + assertDistanceCount(1000, tableBasePath, sqlContext); + assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); // with no change in upstream table, no change in downstream too when pulled. @@ -1581,35 +1341,35 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName()); new HoodieDeltaStreamer(downstreamCfg1, jsc).sync(); - TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext); - TestHelpers.assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext); + assertRecordCount(1000, downstreamTableBasePath, sqlContext); + assertDistanceCount(1000, downstreamTableBasePath, sqlContext); + assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext); TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 1); // upsert() #1 on upstream hudi table cfg.sourceLimit = 2000; cfg.operation = WriteOperationType.UPSERT; new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext); - TestHelpers.assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext); + assertRecordCount(1950, tableBasePath, sqlContext); + assertDistanceCount(1950, tableBasePath, sqlContext); + assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext); lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); - List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); // Incrementally pull changes in upstream hudi table and apply to downstream table downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT, false, null); - TestHelpers.addRecordMerger(recordType, downstreamCfg.configs); + addRecordMerger(recordType, downstreamCfg.configs); downstreamCfg.sourceLimit = 2000; new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); - TestHelpers.assertRecordCount(2000, downstreamTableBasePath, sqlContext); - TestHelpers.assertDistanceCount(2000, downstreamTableBasePath, sqlContext); - TestHelpers.assertDistanceCountWithExactValue(2000, downstreamTableBasePath, sqlContext); + assertRecordCount(2000, downstreamTableBasePath, sqlContext); + assertDistanceCount(2000, downstreamTableBasePath, sqlContext); + assertDistanceCountWithExactValue(2000, downstreamTableBasePath, sqlContext); String finalInstant = TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 2); - counts = TestHelpers.countsPerCommit(downstreamTableBasePath, sqlContext); + counts = countsPerCommit(downstreamTableBasePath, sqlContext); assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); // Test Hive integration @@ -1648,7 +1408,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, "MERGE_ON_READ"); new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext); + assertRecordCount(1000, dataSetBasePath, sqlContext); //now create one more deltaStreamer instance and update payload class cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, @@ -1674,7 +1434,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ"); new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext); + assertRecordCount(1000, dataSetBasePath, sqlContext); //now assert that hoodie.properties file now has updated payload class name Properties props = new Properties(); @@ -1693,7 +1453,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, null); new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext); + assertRecordCount(1000, dataSetBasePath, sqlContext); //now create one more deltaStreamer instance and update payload class cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, @@ -1719,9 +1479,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); // Generate the same 1000 records + 1000 new ones for upsert @@ -1729,10 +1489,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.sourceLimit = 2000; cfg.operation = WriteOperationType.INSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(2000, tableBasePath, sqlContext); + assertRecordCount(2000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); // 1000 records for commit 00000 & 1000 for commit 00001 - List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1000, counts.get(0).getLong(1)); assertEquals(1000, counts.get(1).getLong(1)); @@ -1740,7 +1500,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieTableMetaClient mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg2.configs); + addRecordMerger(recordType, cfg2.configs); cfg2.filterDupes = false; cfg2.sourceLimit = 2000; cfg2.operation = WriteOperationType.UPSERT; @@ -1817,13 +1577,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); if (testEmptyBatch) { prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); deltaStreamer.sync(); // since we mimic'ed empty batch, total records should be same as first sync(). - TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); // validate table schema fetches valid schema from last but one commit. @@ -1834,7 +1594,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // proceed w/ non empty batch. prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext); // validate commit metadata for all completed commits to have valid schema in extra metadata. HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient)); @@ -1875,7 +1635,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { transformerClassNames, PROPS_FILENAME_TEST_ORC, false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext); testNum++; } @@ -1925,7 +1685,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false, true, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecords, tableBasePath, sqlContext); + assertRecordCount(parquetRecords, tableBasePath, sqlContext); deltaStreamer.shutdownGracefully(); // prep json kafka source @@ -1940,13 +1700,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { deltaStreamer.sync(); // if auto reset value is set to LATEST, this all kafka records so far may not be synced. int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS); - TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); + assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); // verify 2nd batch to test LATEST auto reset value. prepareJsonKafkaDFSFiles(20, false, topicName); totalExpectedRecords += 20; deltaStreamer.sync(); - TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); + assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); testNum++; } @@ -1961,14 +1721,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false, true, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext); int totalRecords = JSON_KAFKA_NUM_RECORDS; int records = 10; totalRecords += records; prepareJsonKafkaDFSFiles(records, false, topicName); deltaStreamer.sync(); - TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext); + assertRecordCount(totalRecords, tableBasePath, sqlContext); } @Test @@ -2022,7 +1782,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext); prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName); deltaStreamer = new HoodieDeltaStreamer( @@ -2031,7 +1791,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext); + assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext); } @Disabled("HUDI-6609") @@ -2055,7 +1815,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { //parquetCfg.continuousMode = false; HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc); parquetDs.sync(); - TestHelpers.assertRecordCount(100, tableBasePath, sqlContext); + assertRecordCount(100, tableBasePath, sqlContext); // prep json kafka source topicName = "topic" + testNum; @@ -2070,13 +1830,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { true, Integer.MAX_VALUE, false, null, null, "timestamp", null), jsc); kafkaDs.sync(); int totalExpectedRecords = parquetRecords + 20; - TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); + assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); //parquet again prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA, dataGenerator, "001"); parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc); parquetDs.sync(); - TestHelpers.assertRecordCount(parquetRecords * 2 + 20, tableBasePath, sqlContext); + assertRecordCount(parquetRecords * 2 + 20, tableBasePath, sqlContext); HoodieTableMetaClient metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), tableBasePath); List<HoodieInstant> instants = metaClient.getCommitsTimeline().getInstants(); @@ -2172,7 +1932,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { null, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); } else { assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), @@ -2266,7 +2026,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { transformerClassNames, PROPS_FILENAME_TEST_CSV, false, useSchemaProvider, 1000, false, null, null, sourceOrderingField, null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext); testNum++; } @@ -2386,7 +2146,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false, false, 1000, false, null, null, "timestamp", null, true), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); } @Disabled @@ -2420,7 +2180,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> { TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + ((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, fs); - TestHelpers.assertRecordCount(numRecords, tableBasePath, sqlContext); + assertRecordCount(numRecords, tableBasePath, sqlContext); return true; }); } catch (Exception e) { @@ -2443,7 +2203,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { insertInTable(tableBasePath, 9, WriteOperationType.UPSERT); //No change as this fails with Path not exist error assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync()); - TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext); + assertRecordCount(1000, downstreamTableBasePath, sqlContext); if (downstreamCfg.configs == null) { downstreamCfg.configs = new ArrayList<>(); @@ -2506,7 +2266,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { null, PROPS_FILENAME_TEST_PARQUET, false, false, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); + assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); testNum++; prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); @@ -2518,7 +2278,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { false, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION. - TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); // There should not be any fileIDs in the deleted partition assertTrue(getAllFileIDsInTable(tableBasePath, Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty()); @@ -2544,10 +2304,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType, HoodieRecordType recordType) throws Exception { // Initial insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); + assertDistanceCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); // Collect the fileIds before running HoodieDeltaStreamer @@ -2560,8 +2320,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); if (operationType == WriteOperationType.INSERT_OVERWRITE) { - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); + assertDistanceCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build(); @@ -2576,8 +2336,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.sourceLimit = 1000; new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(950, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(950, tableBasePath, sqlContext); + assertRecordCount(950, tableBasePath, sqlContext); + assertDistanceCount(950, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -2621,7 +2381,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { String tableBasePath = basePath + "/test_drop_partition_columns" + testNum++; // ingest data with dropping partition columns enabled HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.configs.add(String.format("%s=%s", HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync(); @@ -2651,7 +2411,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.forceEmptyMetaSync = true; new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(0, tableBasePath, sqlContext); + assertRecordCount(0, tableBasePath, sqlContext); // make sure hive table is present HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips"); @@ -2667,7 +2427,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // default table type is COW HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); + assertRecordCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); @@ -2690,9 +2450,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. - TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext); + assertRecordCount(1450, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); - List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); // currently there should be 1 deltacommits now @@ -2702,9 +2462,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. // total records should be 1900 now - TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext); + assertRecordCount(1900, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3); - counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); // currently there should be 2 deltacommits now @@ -2731,11 +2491,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { useSchemaProvider, 100000, false, null, null, "timestamp", null); HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext); testNum++; } @@ -2746,7 +2506,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { String tableBasePath = basePath + String.format("/configurationHotUpdate_%s_%s", tableType.name(), recordType.name()); HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - TestHelpers.addRecordMerger(recordType, cfg.configs); + addRecordMerger(recordType, cfg.configs); cfg.continuousMode = true; cfg.tableType = tableType.name(); cfg.configHotUpdateStrategyClass = MockConfigurationHotUpdateStrategy.class.getName(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java index 528a69a7e91..53e1733c9a6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java @@ -86,14 +86,14 @@ public class TestHoodieDeltaStreamerDAGExecution extends HoodieDeltaStreamerTest PARQUET_SOURCE_ROOT, false, "partition_path", ""); String tableBasePath = basePath + "/runDeltaStreamer" + testNum; FileIOUtils.deleteDirectory(new File(tableBasePath)); - HoodieDeltaStreamer.Config config = TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, operationType, + HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, operationType, ParquetDFSSource.class.getName(), null, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, HoodieTableType.MERGE_ON_READ.name(), "timestamp", null); configsOpt.ifPresent(cfgs -> config.configs.addAll(cfgs)); HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc); deltaStreamer.sync(); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); testNum++; if (shouldGenerateUpdates) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java index 8a95be0b6cd..e59d23685e7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java @@ -26,11 +26,11 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; -import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.config.SourceTestConfig; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -38,12 +38,14 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Paths; @@ -62,37 +64,40 @@ import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELIS import static org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE; import static org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; -import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER; -import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.addCommitToTimeline; -import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName; -import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.prepareInitialConfigs; import static org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.deltaStreamerTestRunner; -public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { +public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerTestBase { private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class); String basePath; String propsFilePath; String tableBasePath; - + + @BeforeEach + public void setup() throws Exception { + basePath = UtilitiesTestBase.basePath; + super.setupTest(); + } + @AfterEach public void teardown() throws Exception { TestDataSource.resetDataGen(); + FileIOUtils.deleteDirectory(new File(basePath)); } @ParameterizedTest @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + basePath = Paths.get(URI.create(basePath.replaceAll("/$", ""))).toString(); propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; - tableBasePath = basePath + "/testtable_" + tableType; - prepareInitialConfigs(fs(), basePath, "foo"); - TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + tableBasePath = basePath + "/testUpsertsContinuousModeWithMultipleWritersForConflicts_" + tableType; + prepareInitialConfigs(fs, basePath, "foo"); + TypedProperties props = prepareMultiWriterProps(fs, basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; @@ -106,18 +111,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona prepJobConfig.configs.add(String.format("%s=3", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key())); prepJobConfig.configs.add(String.format("%s=0", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key())); } - HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); + HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc); // Prepare base dataset with some commits deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); } else { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs); } - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext()); - TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext()); + assertRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistanceCount(totalRecords, tableBasePath, sqlContext); return true; }); @@ -131,17 +136,17 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgBackfillJob.continuousMode = false; - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); cfgBackfillJob.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc); // re-init ingestion job to start sync service - HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc); // run ingestion & backfill in parallel, create conflict and fail one runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, @@ -152,14 +157,14 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + basePath = Paths.get(URI.create(basePath.replaceAll("/$", ""))).toString(); propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; - tableBasePath = basePath + "/testtable_" + tableType; - prepareInitialConfigs(fs(), basePath, "foo"); - TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + tableBasePath = basePath + "/testUpsertsContinuousModeWithMultipleWritersWithoutConflicts_" + tableType; + prepareInitialConfigs(fs, basePath, "foo"); + TypedProperties props = prepareMultiWriterProps(fs, basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; @@ -168,31 +173,31 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona prepJobConfig.continuousMode = true; prepJobConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); + HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc); // Prepare base dataset with some commits deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); } else { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs); } - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext()); - TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext()); + assertRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistanceCount(totalRecords, tableBasePath, sqlContext); return true; }); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict - props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props = prepareMultiWriterProps(fs, basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); props.setProperty("hoodie.test.source.generate.inserts", "true"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); cfgBackfillJob2.continuousMode = false; - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); @@ -206,9 +211,9 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona cfgIngestionJob2.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key())); // re-init ingestion job - HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc()); + HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc); // re-init backfill job - HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc()); + HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc); // run ingestion & backfill in parallel, avoid conflict and succeed both runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3, @@ -220,14 +225,14 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"}) void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + basePath = Paths.get(URI.create(basePath.replaceAll("/$", ""))).toString(); propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; - tableBasePath = basePath + "/testtable_" + tableType; - prepareInitialConfigs(fs(), basePath, "foo"); - TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + tableBasePath = basePath + "/testLatestCheckpointCarryOverWithMultipleWriters_" + tableType; + prepareInitialConfigs(fs, basePath, "foo"); + TypedProperties props = prepareMultiWriterProps(fs, basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; @@ -236,18 +241,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona prepJobConfig.continuousMode = true; prepJobConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); + HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc); // Prepare base dataset with some commits deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); } else { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs); } - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext()); - TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext()); + assertRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistanceCount(totalRecords, tableBasePath, sqlContext); return true; }); @@ -255,17 +260,17 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgBackfillJob.continuousMode = false; - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); // run the backfill job - props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props = prepareMultiWriterProps(fs, basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); // get current checkpoint after preparing base dataset with some commits HoodieCommitMetadata commitMetadataForLastInstant = getLatestMetadata(meta); @@ -274,7 +279,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); cfgBackfillJob.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc); backfillJob.sync(); meta.reloadActiveTimeline(); @@ -286,7 +291,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona verifyCommitMetadataCheckpoint(meta, null); cfgBackfillJob.checkpoint = null; - new HoodieDeltaStreamer(cfgBackfillJob, jsc()).sync(); // if deltastreamer checkpoint fetch does not walk back to older commits, this sync will fail + new HoodieDeltaStreamer(cfgBackfillJob, jsc).sync(); // if deltastreamer checkpoint fetch does not walk back to older commits, this sync will fail meta.reloadActiveTimeline(); Assertions.assertEquals(totalCommits + 2, meta.getCommitsTimeline().filterCompletedInstants().countInstants()); verifyCommitMetadataCheckpoint(meta, "00008"); @@ -309,8 +314,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona private static TypedProperties prepareMultiWriterProps(FileSystem fs, String basePath, String propsFilePath) throws IOException { TypedProperties props = new TypedProperties(); - HoodieDeltaStreamerTestBase.populateCommonProps(props, basePath); - HoodieDeltaStreamerTestBase.populateCommonHiveProps(props); + populateCommonProps(props, basePath); + populateCommonHiveProps(props); props.setProperty("include", "sql-transformer.properties"); props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); @@ -362,18 +367,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception { ExecutorService service = Executors.newFixedThreadPool(2); - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp(); // Condition for parallel ingestion job Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs()); + TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs); } else { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs()); + TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs); } - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext()); - TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext()); + assertRecordCount(totalRecords, tableBasePath, sqlContext); + assertDistanceCount(totalRecords, tableBasePath, sqlContext); return true; }; @@ -445,7 +450,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) { this.basePath = basePath; this.lastSuccessfulCommit = lastSuccessfulCommit; - meta = HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build(); + meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); } long getCommitsAfterInstant() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java index 9c858dd475a..a8ee0c694fd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java @@ -193,8 +193,8 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath; streamer.sync(); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, sqlContext); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, sqlContext); + assertRecordCount(5, targetBasePath1, sqlContext); + assertRecordCount(10, targetBasePath2, sqlContext); //insert updates for already existing records in kafka topics testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); @@ -209,8 +209,8 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa assertTrue(streamer.getFailedTables().isEmpty()); //assert the record count matches now - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, sqlContext); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, sqlContext); + assertRecordCount(5, targetBasePath1, sqlContext); + assertRecordCount(10, targetBasePath2, sqlContext); testNum++; } @@ -307,7 +307,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long table2ExpectedRecords) { streamer.sync(); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords, targetBasePath1, sqlContext); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords, targetBasePath2, sqlContext); + assertRecordCount(table1ExpectedRecords, targetBasePath1, sqlContext); + assertRecordCount(table2ExpectedRecords, targetBasePath2, sqlContext); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java index e941aff8c04..888f5ebc2de 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java @@ -59,7 +59,7 @@ public class TestTransformer extends HoodieDeltaStreamerTestBase { PARQUET_SOURCE_ROOT, false, "partition_path", ""); String tableBasePath = basePath + "/testMultipleTransformersWithIdentifiers" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( - TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); @@ -78,7 +78,7 @@ public class TestTransformer extends HoodieDeltaStreamerTestBase { properties.setProperty("transformer.suffix", ".1,.2,.3"); deltaStreamer.sync(); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); assertEquals(0, sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp != 110").count()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 058ed72a3be..24f645c404a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -118,6 +118,7 @@ public class UtilitiesTestBase { protected static HoodieSparkEngineContext context; protected static SparkSession sparkSession; protected static SQLContext sqlContext; + protected static Configuration hadoopConf; @BeforeAll public static void setLogLevel() { @@ -131,7 +132,7 @@ public class UtilitiesTestBase { } public static void initTestServices(boolean needsHdfs, boolean needsHive, boolean needsZookeeper) throws Exception { - final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); + hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); hadoopConf.set("hive.exec.scratchdir", System.getenv("java.io.tmpdir") + "/hive"); if (needsHdfs) {
