This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7fe35fb895 [HUDI-5145] Avoid starting HDFS in hudi-utilities tests
(#7171)
7fe35fb895 is described below
commit 7fe35fb895fb58f0ea599974defd7ffe310a964e
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Nov 10 12:23:24 2022 +0800
[HUDI-5145] Avoid starting HDFS in hudi-utilities tests (#7171)
---
.../testutils/minicluster/HdfsTestService.java | 8 +-
.../TestDFSHoodieTestSuiteWriterAdapter.java | 8 +-
.../integ/testsuite/TestFileDeltaInputWriter.java | 10 +-
.../testsuite/job/TestHoodieTestSuiteJob.java | 74 ++---
.../reader/TestDFSAvroDeltaInputReader.java | 10 +-
.../reader/TestDFSHoodieDatasetInputReader.java | 6 +-
.../functional/HoodieDeltaStreamerTestBase.java | 10 +-
.../functional/TestHoodieDeltaStreamer.java | 314 ++++++++++-----------
.../TestHoodieMultiTableDeltaStreamer.java | 40 +--
.../hudi/utilities/sources/TestAvroDFSSource.java | 4 +-
.../hudi/utilities/sources/TestCsvDFSSource.java | 4 +-
.../utilities/sources/TestGcsEventsSource.java | 2 +-
.../hudi/utilities/sources/TestJsonDFSSource.java | 4 +-
.../utilities/sources/TestParquetDFSSource.java | 2 +-
.../hudi/utilities/sources/TestS3EventsSource.java | 4 +-
.../hudi/utilities/sources/TestSqlSource.java | 11 +-
.../debezium/TestAbstractDebeziumSource.java | 2 +-
.../utilities/testutils/UtilitiesTestBase.java | 61 ++--
.../AbstractCloudObjectsSourceTestBase.java | 2 +-
.../sources/AbstractDFSSourceTestBase.java | 6 +-
.../transform/TestSqlFileBasedTransformer.java | 23 +-
21 files changed, 313 insertions(+), 292 deletions(-)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
index 0766c61c67..727e1e4db6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
@@ -53,8 +53,12 @@ public class HdfsTestService {
private MiniDFSCluster miniDfsCluster;
public HdfsTestService() throws IOException {
- hadoopConf = new Configuration();
- workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
+ this(new Configuration());
+ }
+
+ public HdfsTestService(Configuration hadoopConf) throws IOException {
+ this.hadoopConf = hadoopConf;
+ this.workDir =
Files.createTempDirectory("temp").toAbsolutePath().toString();
}
public Configuration getHadoopConf() {
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index 2b69a319a5..9c21ee6bd4 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -65,7 +65,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices(true, false, false);
}
@AfterAll
@@ -131,15 +131,15 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends
UtilitiesTestBase {
// TODO(HUDI-3668): Fix this test
public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws
IOException {
DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS,
DeltaInputType.AVRO,
- new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath,
dfsBasePath,
+ new SerializableConfiguration(jsc.hadoopConfiguration()), basePath,
basePath,
schemaProvider.getSourceSchema().toString(), 10240L,
jsc.defaultParallelism(), false, false);
DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter =
DeltaWriterFactory
.getDeltaWriterAdapter(dfsSinkConfig, 1);
FlexibleSchemaRecordGenerationIterator itr = new
FlexibleSchemaRecordGenerationIterator(1000,
schemaProvider.getSourceSchema().toString());
dfsDeltaWriterAdapter.write(itr);
- FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
- FileStatus[] fileStatuses = fs.listStatus(new Path(dfsBasePath));
+ FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(basePath));
// Since maxFileSize was 10240L and we produced 1K records each close to
1K size, we should produce more than
// 1 file
assertTrue(fileStatuses.length > 0);
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
index f3cda10a62..36c88d0242 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
@@ -58,7 +58,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices(true, false, false);
}
@AfterAll
@@ -82,7 +82,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
public void testAvroFileSinkWriter() throws IOException {
// 1. Create a Avro File Sink Writer
DeltaInputWriter<GenericRecord> fileSinkWriter =
- new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath +
"/input", schemaProvider.getSourceSchema()
+ new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath +
"/input", schemaProvider.getSourceSchema()
.toString(), 1024 * 1024L);
GenericRecordFullPayloadGenerator payloadGenerator =
new
GenericRecordFullPayloadGenerator(schemaProvider.getSourceSchema());
@@ -96,7 +96,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
});
fileSinkWriter.close();
DeltaWriteStats deltaWriteStats = fileSinkWriter.getDeltaWriteStats();
- FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
+ FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new
Path(deltaWriteStats.getFilePath()));
// Atleast 1 file was written
assertEquals(1, fileStatuses.length);
@@ -104,7 +104,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
assertTrue(fileStatuses[0].getLen() > 0);
// File length should be the same as the number of bytes written
assertTrue(deltaWriteStats.getBytesWritten() > 0);
- List<String> paths = Arrays.asList(fs.globStatus(new Path(dfsBasePath +
"/*/*.avro")))
+ List<String> paths = Arrays.asList(fs.globStatus(new Path(basePath +
"/*/*.avro")))
.stream().map(f ->
f.getPath().toString()).collect(Collectors.toList());
JavaRDD<GenericRecord> writtenRecords =
SparkBasedReader.readAvro(sparkSession,
schemaProvider.getSourceSchema().toString(), paths, Option.empty(),
@@ -119,7 +119,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
public void testAvroFileSinkCreateNewWriter() throws IOException {
// 1. Create a Avro File Sink Writer
DeltaInputWriter<GenericRecord> fileSinkWriter =
- new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath,
+ new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath,
schemaProvider.getSourceSchema().toString(),
1024 * 1024L);
GenericRecordFullPayloadGenerator payloadGenerator =
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index ddf5b07247..0ce0076f88 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -94,30 +94,30 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(true, true);
+ UtilitiesTestBase.initTestServices(true, true, true);
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath +
"/base.properties");
+ + BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, fs, basePath +
"/base.properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath +
"/source.avsc");
+ + SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath +
"/source.avsc");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath +
"/target.avsc");
+ + TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath +
"/target.avsc");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + COW_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME);
+ + COW_DAG_SOURCE_PATH, fs, basePath + "/" + COW_DAG_FILE_NAME);
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + MOR_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + MOR_DAG_FILE_NAME);
+ + MOR_DAG_SOURCE_PATH, fs, basePath + "/" + MOR_DAG_FILE_NAME);
TypedProperties props = getProperties();
- UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath +
"/test-source"
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath +
"/test-source"
+ ".properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, dfs, dfsBasePath + "/"
+ COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
- UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), dfs, dfsBasePath
+ "/test-source"
+ + COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, fs, basePath + "/" +
COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
+ UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), fs, basePath +
"/test-source"
+ ".properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
+ "/.."
- + SPARK_SQL_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" +
SPARK_SQL_DAG_FILE_NAME);
+ + SPARK_SQL_DAG_SOURCE_PATH, fs, basePath + "/" +
SPARK_SQL_DAG_FILE_NAME);
// Properties used for the delta-streamer which incrementally pulls from
upstream DFS Avro source and
// writes to downstream hudi table
@@ -127,10 +127,10 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field",
"timestamp");
// Source schema is the target schema of upstream table
-
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/source.avsc");
- UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
- dfsBasePath + "/test-downstream-source.properties");
+
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source.avsc");
+
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/source.avsc");
+ UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, fs,
+ basePath + "/test-downstream-source.properties");
// these tests cause a lot of log verbosity from spark, turning it down
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
}
@@ -151,8 +151,8 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
}
private void cleanDFSDirs() throws Exception {
- dfs.delete(new Path(dfsBasePath + "/input"), true);
- dfs.delete(new Path(dfsBasePath + "/result"), true);
+ fs.delete(new Path(basePath + "/input"), true);
+ fs.delete(new Path(basePath + "/result"), true);
}
private static TypedProperties getProperties() {
@@ -161,9 +161,9 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
props.setProperty("hoodie.datasource.write.partitionpath.field",
"timestamp");
props.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type",
"UNIX_TIMESTAMP");
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyy/MM/dd");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/source.avsc");
- props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath +
"/input");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source.avsc");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/source.avsc");
+ props.setProperty("hoodie.deltastreamer.source.dfs.root", basePath +
"/input");
props.setProperty("hoodie.datasource.hive_sync.assume_date_partitioning",
"true");
props.setProperty("hoodie.datasource.hive_sync.skip_ro_suffix", "true");
props.setProperty("hoodie.datasource.write.keytranslator.class",
"org.apache.hudi"
@@ -205,8 +205,8 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
@MethodSource("configParams")
public void testDagWithInsertUpsertAndValidate(boolean useDeltaStreamer,
String tableType) throws Exception {
this.cleanDFSDirs();
- String inputBasePath = dfsBasePath + "/input/" +
UUID.randomUUID().toString();
- String outputBasePath = dfsBasePath + "/result/" +
UUID.randomUUID().toString();
+ String inputBasePath = basePath + "/input/" + UUID.randomUUID().toString();
+ String outputBasePath = basePath + "/result/" +
UUID.randomUUID().toString();
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath,
useDeltaStreamer, tableType);
cfg.workloadDagGenerator = ComplexDagGenerator.class.getName();
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
@@ -220,8 +220,8 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
boolean useDeltaStreamer = false;
String tableType = "COPY_ON_WRITE";
this.cleanDFSDirs();
- String inputBasePath = dfsBasePath + "/input";
- String outputBasePath = dfsBasePath + "/result";
+ String inputBasePath = basePath + "/input";
+ String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath,
useDeltaStreamer, tableType);
if (tableType == HoodieTableType.COPY_ON_WRITE.name()) {
cfg.workloadDagGenerator = HiveSyncDagGenerator.class.getName();
@@ -238,11 +238,11 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
public void testCOWFullDagFromYaml() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
- String inputBasePath = dfsBasePath + "/input";
- String outputBasePath = dfsBasePath + "/result";
+ String inputBasePath = basePath + "/input";
+ String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath,
useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
- cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME;
+ cfg.workloadYamlPath = basePath + "/" + COW_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
@@ -253,11 +253,11 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
public void testMORFullDagFromYaml() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
- String inputBasePath = dfsBasePath + "/input";
- String outputBasePath = dfsBasePath + "/result";
+ String inputBasePath = basePath + "/input";
+ String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath,
useDeltaStreamer, HoodieTableType
.MERGE_ON_READ.name());
- cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME;
+ cfg.workloadYamlPath = basePath + "/" + MOR_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
@@ -272,13 +272,13 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
TypedProperties props = getProperties();
props.setProperty("hoodie.write.concurrency.mode",
"optimistic_concurrency_control");
props.setProperty("hoodie.failed.writes.cleaner.policy", "LAZY");
- UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath +
"/test-source"
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath +
"/test-source"
+ ".properties");
- String inputBasePath = dfsBasePath + "/input";
- String outputBasePath = dfsBasePath + "/result";
+ String inputBasePath = basePath + "/input";
+ String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath,
useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
- cfg.workloadYamlPath = dfsBasePath + "/" +
COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
+ cfg.workloadYamlPath = basePath + "/" +
COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
@@ -289,11 +289,11 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
public void testSparkSqlDag() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
- String inputBasePath = dfsBasePath + "/input";
- String outputBasePath = dfsBasePath + "/result";
+ String inputBasePath = basePath + "/input";
+ String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath,
useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
- cfg.workloadYamlPath = dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME;
+ cfg.workloadYamlPath = basePath + "/" + SPARK_SQL_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
}
@@ -307,7 +307,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
cfg.tableType = tableType;
cfg.sourceClassName = AvroDFSSource.class.getName();
cfg.sourceOrderingField = SchemaUtils.SOURCE_ORDERING_FIELD;
- cfg.propsFilePath = dfsBasePath + "/test-source.properties";
+ cfg.propsFilePath = basePath + "/test-source.properties";
cfg.outputTypeName = DeltaOutputMode.DFS.name();
cfg.inputFormatName = DeltaInputType.AVRO.name();
cfg.limitFileSize = 1024 * 1024L;
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
index 9f9439f376..631601fb2c 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
@@ -43,7 +43,7 @@ public class TestDFSAvroDeltaInputReader extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices(true, false, false);
}
@AfterAll
@@ -59,12 +59,12 @@ public class TestDFSAvroDeltaInputReader extends
UtilitiesTestBase {
@Test
@Disabled
public void testDFSSinkReader() throws IOException {
- FileSystem fs = FSUtils.getFs(dfsBasePath, new Configuration());
+ FileSystem fs = FSUtils.getFs(basePath, new Configuration());
// Create 10 avro files with 10 records each
- TestUtils.createAvroFiles(jsc, sparkSession, dfsBasePath, 10, 10);
- FileStatus[] statuses = fs.globStatus(new Path(dfsBasePath + "/*/*.avro"));
+ TestUtils.createAvroFiles(jsc, sparkSession, basePath, 10, 10);
+ FileStatus[] statuses = fs.globStatus(new Path(basePath + "/*/*.avro"));
DFSAvroDeltaInputReader reader =
- new DFSAvroDeltaInputReader(sparkSession,
TestUtils.getSchema().toString(), dfsBasePath, Option.empty(),
+ new DFSAvroDeltaInputReader(sparkSession,
TestUtils.getSchema().toString(), basePath, Option.empty(),
Option.empty());
assertEquals(reader.analyzeSingleFile(statuses[0].getPath().toString()),
5);
assertEquals(reader.read(100).count(), 100);
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
index 80f6e2548c..cf2921d41e 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
@@ -51,7 +51,7 @@ public class TestDFSHoodieDatasetInputReader extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices(true, false, false);
}
@AfterAll
@@ -62,7 +62,7 @@ public class TestDFSHoodieDatasetInputReader extends
UtilitiesTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath);
+ HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
}
@AfterEach
@@ -117,7 +117,7 @@ public class TestDFSHoodieDatasetInputReader extends
UtilitiesTestBase {
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws
Exception {
// Prepare the AvroParquetIO
- return HoodieWriteConfig.newBuilder().withPath(dfsBasePath)
+ return HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withSchema(HoodieTestDataGenerator
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
index ad74235ae0..1d85ba0eae 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
@@ -104,14 +104,14 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(true, true);
- PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
- ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
- JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
+ UtilitiesTestBase.initTestServices(false, true, true);
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
+ ORC_SOURCE_ROOT = basePath + "/orcFiles";
+ JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;
- prepareInitialConfigs(dfs, dfsBasePath, testUtils.brokerAddress());
+ prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index e1c296617a..5c1365ad4f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -250,7 +250,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = sourceOrderingField;
- cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
+ cfg.propsFilePath = UtilitiesTestBase.basePath + "/" + propsFilename;
cfg.sourceLimit = sourceLimit;
cfg.checkpoint = checkpoint;
if (updatePayloadClass) {
@@ -272,7 +272,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.sourceClassName = HoodieIncrSource.class.getName();
cfg.operation = op;
cfg.sourceOrderingField = "timestamp";
- cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
+ cfg.propsFilePath = UtilitiesTestBase.basePath +
"/test-downstream-source.properties";
cfg.sourceLimit = 1000;
if (null != schemaProviderClassName) {
cfg.schemaProviderClassName = schemaProviderClassName;
@@ -448,7 +448,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testProps() {
TypedProperties props =
- new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath +
"/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
+ new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" +
PROPS_FILENAME_TEST_SOURCE)).getProps();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key",
props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
@@ -561,30 +561,30 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testKafkaConnectCheckpointProvider() throws IOException {
- String tableBasePath = dfsBasePath + "/test_table";
- String bootstrapPath = dfsBasePath + "/kafka_topic1";
+ String tableBasePath = basePath + "/test_table";
+ String bootstrapPath = basePath + "/kafka_topic1";
String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
String checkpointProviderClass =
"org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
HoodieDeltaStreamer.Config cfg =
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
TypedProperties props =
- new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath +
"/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
+ new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" +
PROPS_FILENAME_TEST_SOURCE)).getProps();
props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
cfg.initialCheckpointProvider = checkpointProviderClass;
// create regular kafka connect hdfs dirs
- dfs.mkdirs(new Path(bootstrapPath));
- dfs.mkdirs(new Path(partitionPath));
+ fs.mkdirs(new Path(bootstrapPath));
+ fs.mkdirs(new Path(partitionPath));
// generate parquet files using kafka connect naming convention
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000",
100)), new Path(filePath));
- HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs,
hdfsTestService.getHadoopConf(), Option.ofNullable(props));
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, fs,
jsc.hadoopConfiguration(), Option.ofNullable(props));
assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint);
}
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
Exception e = assertThrows(IOException.class, () -> {
- String tableBasePath = dfsBasePath + "/test_table_invalid_key_gen";
+ String tableBasePath = basePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()),
PROPS_FILENAME_TEST_INVALID, false), jsc);
@@ -598,9 +598,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testTableCreation() throws Exception {
Exception e = assertThrows(TableNotFoundException.class, () -> {
- dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
+ fs.mkdirs(new Path(basePath + "/not_a_table"));
HoodieDeltaStreamer deltaStreamer =
- new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath +
"/not_a_table", WriteOperationType.BULK_INSERT), jsc);
+ new HoodieDeltaStreamer(TestHelpers.makeConfig(basePath +
"/not_a_table", WriteOperationType.BULK_INSERT), jsc);
deltaStreamer.sync();
}, "Should error out when pointed out at a dir thats not a table");
// expected
@@ -609,7 +609,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
- String tableBasePath = dfsBasePath + "/test_table";
+ String tableBasePath = basePath + "/test_table";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
@@ -627,7 +627,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(1950, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
// Perform bootstrap with tableBasePath as source
- String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
+ String bootstrapSourcePath = basePath + "/src_bootstrapped";
Dataset<Row> sourceDf = sqlContext.read()
.format("org.apache.hudi")
.load(tableBasePath);
@@ -635,7 +635,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// in the value. Currently it fails the tests due to slash encoding.
sourceDf.write().format("parquet").partitionBy("rider").save(bootstrapSourcePath);
- String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
+ String newDatasetBasePath = basePath + "/test_dataset_bootstrapped";
cfg.runBootstrap = true;
cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s",
bootstrapSourcePath));
cfg.configs.add(String.format("%s=%s",
DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
@@ -661,7 +661,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testModifiedTableConfigs() throws Exception {
- String tableBasePath = dfsBasePath + "/test_table_modified_configs";
+ String tableBasePath = basePath + "/test_table_modified_configs";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
@@ -693,32 +693,32 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs,
totalCommits);
+ TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs,
totalCommits);
}
@ParameterizedTest
@MethodSource("schemaEvolArgs")
public void testSchemaEvolution(String tableType, boolean
useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
- String tableBasePath = dfsBasePath + "/test_table_schema_evolution" +
tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
+ String tableBasePath = basePath + "/test_table_schema_evolution" +
tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
// Insert data produced with Schema A, pass Schema A
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT,
Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ dfsBasePath + "/source.avsc");
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file="
+ dfsBasePath + "/source.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ basePath + "/source.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file="
+ basePath + "/source.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE
+ "=false");
}
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// Upsert data produced with Schema B, pass Schema B
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ dfsBasePath + "/source.avsc");
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file="
+ dfsBasePath + "/source_evolved.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ basePath + "/source.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file="
+ basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE
+ "=false");
@@ -726,7 +726,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1450, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
@@ -741,9 +741,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ dfsBasePath + "/source.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ basePath + "/source.avsc");
if (useUserProvidedSchema) {
-
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" +
dfsBasePath + "/source_evolved.avsc");
+
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" +
basePath + "/source_evolved.avsc");
}
if (!useSchemaPostProcessor) {
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE
+ "=false");
@@ -752,15 +752,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new HoodieDeltaStreamer(cfg, jsc).sync();
// again, 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3);
+ TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
- TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build());
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
Schema tableSchema =
tableSchemaResolver.getTableAvroSchemaWithoutMetadataFields();
assertNotNull(tableSchema);
- Schema expectedSchema = new Schema.Parser().parse(dfs.open(new
Path(dfsBasePath + "/source_evolved.avsc")));
+ Schema expectedSchema = new Schema.Parser().parse(fs.open(new
Path(basePath + "/source_evolved.avsc")));
if (!useUserProvidedSchema || useSchemaPostProcessor) {
expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
AvroConversionUtils.convertAvroSchemaToStructType(expectedSchema),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
@@ -768,8 +768,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(tableSchema, expectedSchema);
// clean up and reinit
-
UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath,
jsc.hadoopConfiguration()), dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
- writeCommonPropsToFile(dfs, dfsBasePath);
+
UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath,
jsc.hadoopConfiguration()), basePath + "/" + PROPS_FILENAME_TEST_SOURCE);
+ writeCommonPropsToFile(fs, basePath);
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
@@ -780,7 +780,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
- String tableBasePath = dfsBasePath + "/non_continuous_cow";
+ String tableBasePath = basePath + "/non_continuous_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
@@ -804,7 +804,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
- String tableBasePath = dfsBasePath + "/non_continuous_mor";
+ String tableBasePath = basePath + "/non_continuous_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
@@ -821,7 +821,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String
tempDir, boolean testShutdownGracefully) throws Exception {
- String tableBasePath = dfsBasePath + "/" + tempDir;
+ String tableBasePath = basePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
@@ -836,10 +836,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
- TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
- TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, fs);
+ TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
} else {
- TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
+ TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
@@ -902,7 +902,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@ParameterizedTest
@ValueSource(strings = {"true", "false"})
public void testInlineClustering(String preserveCommitMetadata) throws
Exception {
- String tableBasePath = dfsBasePath + "/inlineClustering";
+ String tableBasePath = basePath + "/inlineClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
@@ -913,15 +913,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", "", preserveCommitMetadata));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
return true;
});
}
@Test
public void testDeltaSyncWithPendingClustering() throws Exception {
- String tableBasePath = dfsBasePath + "/inlineClusteringPending";
+ String tableBasePath = basePath + "/inlineClusteringPending";
// ingest data
int totalRecords = 2000;
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
@@ -930,12 +930,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// schedule a clustering job to build a clustering plan and transition to
inflight
HoodieClusteringJob clusteringJob =
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
clusteringJob.cluster(0);
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
@@ -947,14 +947,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
ds2.sync();
String completeClusteringTimeStamp =
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
- TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean)
throws Exception {
- String tableBasePath = dfsBasePath +
"/cleanerDeleteReplacedDataWithArchive" + asyncClean;
+ String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive"
+ asyncClean;
int totalRecords = 3000;
@@ -969,15 +969,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
return true;
});
- TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
// Step 2 : Get the first replacecommit and extract the corresponding
replaced file IDs.
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline replacedTimeline =
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
Option<HoodieInstant> firstReplaceHoodieInstant =
replacedTimeline.nthFromLastInstant(1);
assertTrue(firstReplaceHoodieInstant.isPresent());
@@ -1087,7 +1087,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
config.basePath = basePath;
config.clusteringInstantTime = clusteringInstantTime;
config.runSchedule = runSchedule;
- config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
+ config.propsFilePath = UtilitiesTestBase.basePath +
"/clusteringjob.properties";
config.runningMode = runningMode;
if (retryLastFailedClusteringJob != null) {
config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
@@ -1104,7 +1104,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
config.basePath = basePath;
config.tableName = tableName;
config.indexInstantTime = indexInstantTime;
- config.propsFilePath = dfsBasePath + "/indexer.properties";
+ config.propsFilePath = UtilitiesTestBase.basePath + "/indexer.properties";
config.runningMode = runningMode;
config.indexTypes = indexTypes;
return config;
@@ -1112,11 +1112,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testHoodieIndexer() throws Exception {
- String tableBasePath = dfsBasePath + "/asyncindexer";
+ String tableBasePath = basePath + "/asyncindexer";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000,
"false");
deltaStreamerTestRunner(ds, (r) -> {
- TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
Option<String> scheduleIndexInstantTime = Option.empty();
try {
@@ -1128,13 +1128,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
return false;
}
if (scheduleIndexInstantTime.isPresent()) {
- TestHelpers.assertPendingIndexCommit(tableBasePath, dfs);
+ TestHelpers.assertPendingIndexCommit(tableBasePath, fs);
LOG.info("Schedule indexing success, now build index with instant time
" + scheduleIndexInstantTime.get());
HoodieIndexer runIndexingJob = new HoodieIndexer(jsc,
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName,
scheduleIndexInstantTime.get(), EXECUTE, "COLUMN_STATS"));
runIndexingJob.start(0);
LOG.info("Metadata indexing success");
- TestHelpers.assertCompletedIndexCommit(tableBasePath, dfs);
+ TestHelpers.assertCompletedIndexCommit(tableBasePath, fs);
} else {
LOG.warn("Metadata indexing failed");
}
@@ -1145,12 +1145,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieAsyncClusteringJob(boolean
shouldPassInClusteringInstantTime) throws Exception {
- String tableBasePath = dfsBasePath + "/asyncClusteringJob";
+ String tableBasePath = basePath + "/asyncClusteringJob";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000,
"false");
CountDownLatch countDownLatch = new CountDownLatch(1);
deltaStreamerTestRunner(ds, (r) -> {
- TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
countDownLatch.countDown();
return true;
});
@@ -1171,7 +1171,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
shouldPassInClusteringInstantTime ?
scheduleClusteringInstantTime.get() : null, false);
HoodieClusteringJob clusterClusteringJob = new
HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
LOG.info("Cluster success");
} else {
LOG.warn("Clustering execution failed");
@@ -1184,7 +1184,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testAsyncClusteringService() throws Exception {
- String tableBasePath = dfsBasePath + "/asyncClustering";
+ String tableBasePath = basePath + "/asyncClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 2000;
@@ -1195,12 +1195,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
return true;
});
// There should be 4 commits, one of which should be a replace commit
- TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath,
sqlContext);
}
@@ -1212,7 +1212,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
*/
@Test
public void testAsyncClusteringServiceWithConflicts() throws Exception {
- String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts";
+ String tableBasePath = basePath + "/asyncClusteringWithConflicts";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 2000;
@@ -1223,18 +1223,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
return true;
});
// There should be 4 commits, one of which should be a replace commit
- TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
TestHelpers.assertDistinctRecordCount(1900, tableBasePath, sqlContext);
}
@Test
public void testAsyncClusteringServiceWithCompaction() throws Exception {
- String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
+ String tableBasePath = basePath + "/asyncClusteringCompaction";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 2000;
@@ -1245,20 +1245,20 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
return true;
});
// There should be 4 commits, one of which should be a replace commit
- TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
- TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath,
sqlContext);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAsyncClusteringJobWithRetry(boolean
retryLastFailedClusteringJob) throws Exception {
- String tableBasePath = dfsBasePath + "/asyncClustering3";
+ String tableBasePath = basePath + "/asyncClustering3";
// ingest data
int totalRecords = 3000;
@@ -1270,7 +1270,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
ds.sync();
// assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// schedule a clustering job to build a clustering plan
HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath,
null, false, "schedule");
@@ -1281,7 +1281,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
ds2.sync();
// convert clustering request into inflight, Simulate the last clustering
failed scenario
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
HoodieInstant hoodieInflightInstant =
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
@@ -1304,13 +1304,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@ParameterizedTest
@ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"})
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String
runningMode) throws Exception {
- String tableBasePath = dfsBasePath + "/asyncClustering2";
+ String tableBasePath = basePath + "/asyncClustering2";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000,
"false");
HoodieClusteringJob scheduleClusteringJob =
initialHoodieClusteringJob(tableBasePath, null, true, runningMode);
deltaStreamerTestRunner(ds, (r) -> {
Exception exception = null;
- TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
try {
int result = scheduleClusteringJob.cluster(0);
if (result == 0) {
@@ -1330,16 +1330,16 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
switch (runningMode.toLowerCase()) {
case SCHEDULE_AND_EXECUTE: {
- TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
return true;
}
case SCHEDULE: {
- TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
- TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, fs);
+ TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
return true;
}
case EXECUTE: {
- TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
+ TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
return true;
}
default:
@@ -1355,37 +1355,37 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
*/
@Test
public void
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws
Exception {
- String tableBasePath = dfsBasePath + "/test_table2";
- String downstreamTableBasePath = dfsBasePath + "/test_downstream_table2";
+ String tableBasePath = basePath + "/test_table2";
+ String downstreamTableBasePath = basePath + "/test_downstream_table2";
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, true);
// NOTE: We should not have need to set below config, 'datestr' should
have assumed date partitioning
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath,
sqlContext);
- String lastInstantForUpstreamTable =
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ String lastInstantForUpstreamTable =
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// Now incrementally pull from the above hudi table and ingest to second
table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath,
downstreamTableBasePath, WriteOperationType.BULK_INSERT,
true, null);
- new HoodieDeltaStreamer(downstreamCfg, jsc, dfs,
hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(downstreamCfg, jsc, fs,
hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000,
downstreamTableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 1);
// No new data => no commits for upstream table
cfg.sourceLimit = 0;
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath,
sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// with no change in upstream table, no change in downstream too when
pulled.
HoodieDeltaStreamer.Config downstreamCfg1 =
@@ -1395,16 +1395,16 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000,
downstreamTableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 1);
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1950, tableBasePath,
sqlContext);
- lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001",
tableBasePath, dfs, 2);
+ lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001",
tableBasePath, fs, 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1950, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
@@ -1418,7 +1418,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
TestHelpers.assertDistanceCountWithExactValue(2000,
downstreamTableBasePath, sqlContext);
String finalInstant =
- TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, dfs, 2);
+ TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable,
downstreamTableBasePath, fs, 2);
counts = TestHelpers.countsPerCommit(downstreamTableBasePath, sqlContext);
assertEquals(2000, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
@@ -1438,12 +1438,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testNullSchemaProvider() throws Exception {
- String tableBasePath = dfsBasePath + "/test_table";
+ String tableBasePath = basePath + "/test_table";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, true,
false, false, null, null);
Exception e = assertThrows(HoodieException.class, () -> {
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
}, "Should error out when schema provider is not provided");
LOG.debug("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Please provide a valid schema provider
class!"));
@@ -1451,18 +1451,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testPayloadClassUpdate() throws Exception {
- String dataSetBasePath = dfsBasePath +
"/test_dataset_mor_payload_class_update";
+ String dataSetBasePath = basePath +
"/test_dataset_mor_payload_class_update";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, "MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
//now assert that hoodie.properties file now has updated payload class name
Properties props = new Properties();
@@ -1477,11 +1477,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testPartialPayloadClass() throws Exception {
- String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
+ String dataSetBasePath = basePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, PartialUpdateAvroPayload.class.getName(),
"MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
//now assert that hoodie.properties file now has updated payload class name
@@ -1496,18 +1496,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testPayloadClassUpdateWithCOWTable() throws Exception {
- String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
+ String dataSetBasePath = basePath + "/test_dataset_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, null);
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, DummyAvroPayload.class.getName(), null);
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
//now assert that hoodie.properties file does not have payload class prop
since it is a COW table
Properties props = new Properties();
@@ -1522,13 +1522,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testFilterDupes() throws Exception {
- String tableBasePath = dfsBasePath + "/test_dupes_table";
+ String tableBasePath = basePath + "/test_dupes_table";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
@@ -1536,7 +1536,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.operation = WriteOperationType.INSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
// 1000 records for commit 00000 & 1000 for commit 00001
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1000, counts.get(0).getLong(1));
@@ -1621,7 +1621,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TypedProperties parquetProps = new TypedProperties();
if (addCommonProps) {
- populateCommonProps(parquetProps, dfsBasePath);
+ populateCommonProps(parquetProps, basePath);
}
parquetProps.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
@@ -1631,16 +1631,16 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
parquetProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field",
partitionPath);
if (useSchemaProvider) {
-
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/" + sourceSchemaFile);
+
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + sourceSchemaFile);
if (hasTransformer) {
-
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/" + targetSchemaFile);
+
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/" + targetSchemaFile);
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root",
parquetSourceRoot);
if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH,
emptyBatchParam);
}
- UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath +
"/" + propsFileName);
+ UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, fs, basePath + "/"
+ propsFileName);
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String>
transformerClassNames) throws Exception {
@@ -1648,14 +1648,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String>
transformerClassNames, boolean testEmptyBatch) throws Exception {
- PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 10;
boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
- String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
: ParquetDFSSource.class.getName(),
@@ -1706,15 +1706,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
orcProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
orcProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
if (useSchemaProvider) {
-
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/" + "source.avsc");
+
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + "source.avsc");
if (transformerClassNames != null) {
-
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/" + "target.avsc");
+
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/" + "target.avsc");
}
}
orcProps.setProperty("hoodie.deltastreamer.source.dfs.root",
ORC_SOURCE_ROOT);
- UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/"
+ PROPS_FILENAME_TEST_ORC);
+ UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, fs, basePath + "/" +
PROPS_FILENAME_TEST_ORC);
- String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
+ String tableBasePath = basePath + "/test_orc_source_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
@@ -1727,7 +1727,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
private void prepareJsonKafkaDFSSource(String propsFileName, String
autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
- populateAllCommonProps(props, dfsBasePath, testUtils.brokerAddress());
+ populateAllCommonProps(props, basePath, testUtils.brokerAddress());
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -1735,11 +1735,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
props.setProperty("hoodie.deltastreamer.source.dfs.root",
JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source_uber.avsc");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target_uber.avsc");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source_uber.avsc");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target_uber.avsc");
props.setProperty("auto.offset.reset", autoResetValue);
- UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" +
propsFileName);
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" +
propsFileName);
}
/**
@@ -1750,14 +1750,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
*/
private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean
autoResetToLatest) throws Exception {
// prep parquet source
- PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfsToKafka" + testNum;
int parquetRecords = 10;
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA,
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false, "source_uber.avsc",
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "driver");
// delta streamer w/ parquet source
- String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum;
+ String tableBasePath = basePath + "/test_dfs_to_kafka" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
@@ -1793,7 +1793,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",
topicName);
- String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+ String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
@@ -1815,7 +1815,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
kafkaCheckpointType = "timestamp";
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",
topicName);
- String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+ String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
@@ -1865,7 +1865,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean
testInitFailure) throws Exception {
- PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 10;
boolean hasTransformer = false;
boolean useSchemaProvider = false;
@@ -1873,7 +1873,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", "0");
- String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() :
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -1881,17 +1881,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
deltaStreamer.sync();
if (testInitFailure) {
- FileStatus[] fileStatuses = dfs.listStatus(new Path(tableBasePath +
"/.hoodie/"));
+ FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath +
"/.hoodie/"));
Arrays.stream(fileStatuses).filter(entry ->
entry.getPath().getName().contains("commit") ||
entry.getPath().getName().contains("inflight")).forEach(entry -> {
try {
- dfs.delete(entry.getPath());
+ fs.delete(entry.getPath());
} catch (IOException e) {
e.printStackTrace();
}
});
}
// delete hoodie.properties
- dfs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
+ fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
// restart the pipeline.
if (testInitFailure) { // should succeed.
@@ -1937,7 +1937,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
private void prepareCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, boolean
hasTransformer) throws IOException {
- String sourceRoot = dfsBasePath + "/csvFiles";
+ String sourceRoot = basePath + "/csvFiles";
String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" :
"_c1";
String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path"
: "_c2";
@@ -1947,9 +1947,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
csvProps.setProperty("hoodie.datasource.write.recordkey.field",
recordKeyField);
csvProps.setProperty("hoodie.datasource.write.partitionpath.field",
partitionPath);
if (useSchemaProvider) {
-
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source-flattened.avsc");
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source-flattened.avsc");
if (hasTransformer) {
-
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target-flattened.avsc");
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target-flattened.avsc");
}
}
csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
@@ -1965,20 +1965,20 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
csvProps.setProperty("hoodie.deltastreamer.csv.header",
Boolean.toString(hasHeader));
}
- UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/"
+ PROPS_FILENAME_TEST_CSV);
+ UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, fs, basePath + "/" +
PROPS_FILENAME_TEST_CSV);
String path = sourceRoot + "/1.csv";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
UtilitiesTestBase.Helpers.saveCsvToDFS(
hasHeader, sep,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000",
CSV_NUM_RECORDS, true)),
- dfs, path);
+ fs, path);
}
private void testCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, List<String>
transformerClassNames) throws Exception {
prepareCsvDFSSource(hasHeader, sep, useSchemaProvider,
transformerClassNames != null);
- String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+ String tableBasePath = basePath + "/test_csv_table" + testNum;
String sourceOrderingField = (hasHeader || useSchemaProvider) ?
"timestamp" : "_c0";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(
@@ -2075,7 +2075,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void prepareSqlSource() throws IOException {
- String sourceRoot = dfsBasePath + "sqlSourceFiles";
+ String sourceRoot = basePath + "sqlSourceFiles";
TypedProperties sqlSourceProps = new TypedProperties();
sqlSourceProps.setProperty("include", "base.properties");
sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
@@ -2083,7 +2083,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select
* from test_sql_table");
- UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, dfs, dfsBasePath
+ "/" + PROPS_FILENAME_TEST_SQL_SOURCE);
+ UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath +
"/" + PROPS_FILENAME_TEST_SQL_SOURCE);
// Data generation
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -2099,7 +2099,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testSqlSourceSource() throws Exception {
prepareSqlSource();
- String tableBasePath = dfsBasePath + "/test_sql_source_table" + testNum++;
+ String tableBasePath = basePath + "/test_sql_source_table" + testNum++;
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(
tableBasePath, WriteOperationType.INSERT,
SqlSource.class.getName(),
@@ -2126,11 +2126,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
props.setProperty("hoodie.datasource.write.recordkey.field", "ID");
props.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
- UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath +
"/test-jdbc-source.properties");
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath +
"/test-jdbc-source.properties");
int numRecords = 1000;
int sourceLimit = 100;
- String tableBasePath = dfsBasePath + "/triprec";
+ String tableBasePath = basePath + "/triprec";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT, JdbcSource.class.getName(),
null, "test-jdbc-source.properties", false,
false, sourceLimit, false, null, null, "timestamp", null);
@@ -2140,7 +2140,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> {
- TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit +
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, dfs);
+ TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit +
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, fs);
TestHelpers.assertRecordCount(numRecords, tableBasePath, sqlContext);
return true;
});
@@ -2151,8 +2151,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testHoodieIncrFallback() throws Exception {
- String tableBasePath = dfsBasePath + "/incr_test_table";
- String downstreamTableBasePath = dfsBasePath +
"/incr_test_downstream_table";
+ String tableBasePath = basePath + "/incr_test_table";
+ String downstreamTableBasePath = basePath + "/incr_test_downstream_table";
insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT);
HoodieDeltaStreamer.Config downstreamCfg =
@@ -2202,12 +2202,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testInsertOverwrite() throws Exception {
- testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite",
WriteOperationType.INSERT_OVERWRITE);
+ testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite",
WriteOperationType.INSERT_OVERWRITE);
}
@Test
public void testInsertOverwriteTable() throws Exception {
- testDeltaStreamerWithSpecifiedOperation(dfsBasePath +
"/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
+ testDeltaStreamerWithSpecifiedOperation(basePath +
"/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
}
@Disabled("Local run passing; flaky in CI environment.")
@@ -2215,7 +2215,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
public void testDeletePartitions() throws Exception {
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false,
"partition_path");
- String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -2242,7 +2242,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
// setting the operationType
cfg.operation = operationType;
@@ -2251,27 +2251,27 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
cfg.sourceLimit = 1000;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
}
@Test
public void testFetchingCheckpointFromPreviousCommits() throws IOException {
- HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dfsBasePath +
"/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(basePath +
"/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
TypedProperties properties = new TypedProperties();
properties.setProperty("hoodie.datasource.write.recordkey.field","key");
properties.setProperty("hoodie.datasource.write.partitionpath.field","pp");
TestDeltaSync testDeltaSync = new TestDeltaSync(cfg, sparkSession, null,
properties,
- jsc, dfs, jsc.hadoopConfiguration(), null);
+ jsc, fs, jsc.hadoopConfiguration(), null);
properties.put(HoodieTableConfig.NAME.key(), "sample_tbl");
- HoodieTableMetaClient metaClient =
HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath,
HoodieTableType.COPY_ON_WRITE, properties);
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.COPY_ON_WRITE, properties);
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc");
@@ -2295,17 +2295,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testDropPartitionColumns() throws Exception {
- String tableBasePath = dfsBasePath + "/test_drop_partition_columns" +
testNum++;
+ String tableBasePath = basePath + "/test_drop_partition_columns" +
testNum++;
// ingest data with dropping partition columns enabled
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
cfg.configs.add(String.format("%s=%s",
HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
-
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build());
+
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
// get schema from data file written in the latest commit
Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
assertNotNull(tableSchema);
@@ -2317,7 +2317,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testForceEmptyMetaSync() throws Exception {
- String tableBasePath = dfsBasePath + "/test_force_empty_meta_sync";
+ String tableBasePath = basePath + "/test_force_empty_meta_sync";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
cfg.sourceLimit = 0;
@@ -2325,7 +2325,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.enableMetaSync = true;
cfg.forceEmptyMetaSync = true;
- new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(0, tableBasePath, sqlContext);
// make sure hive table is present
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index 8f54b0d34d..8ac8b616b5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -62,8 +62,8 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
HoodieMultiTableDeltaStreamer.Config config = new
HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
config.targetTableName = "dummy_table";
- config.basePathPrefix = dfsBasePath + "/" + basePathPrefix;
- config.propsFilePath = dfsBasePath + "/" + fileName;
+ config.basePathPrefix = basePath + "/" + basePathPrefix;
+ config.propsFilePath = basePath + "/" + fileName;
config.tableType = "COPY_ON_WRITE";
config.sourceClassName = sourceClassName;
config.sourceOrderingField = "timestamp";
@@ -79,7 +79,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testMetaSyncConfig() throws IOException {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
TestDataSource.class.getName(), true, true, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config",
TestDataSource.class.getName(), true, true, null);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext =
streamer.getTableExecutionContexts().get(1);
assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2",
executionContext.getConfig().syncClientToolClassNames);
@@ -87,7 +87,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testInvalidHiveSyncProps() throws IOException {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath +
"/config", TestDataSource.class.getName(), true, true, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, basePath +
"/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(HoodieException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when hive sync table not provided with enableHiveSync
flag");
@@ -97,7 +97,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testInvalidPropsFilePath() throws IOException {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config",
TestDataSource.class.getName(), true, true, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_INVALID_FILE, basePath + "/config",
TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid props file is provided");
@@ -107,7 +107,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testInvalidTableConfigFilePath() throws IOException {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config",
TestDataSource.class.getName(), true, true, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, basePath + "/config",
TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid table config props file path is provided");
@@ -117,11 +117,11 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testCustomConfigProps() throws IOException {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config",
TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext =
streamer.getTableExecutionContexts().get(1);
assertEquals(2, streamer.getTableExecutionContexts().size());
- assertEquals(dfsBasePath +
"/multi_table_dataset/uber_db/dummy_table_uber",
executionContext.getConfig().targetBasePath);
+ assertEquals(basePath + "/multi_table_dataset/uber_db/dummy_table_uber",
executionContext.getConfig().targetBasePath);
assertEquals("uber_db.dummy_table_uber",
executionContext.getConfig().targetTableName);
assertEquals("topic1",
executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
assertEquals("_row_key",
executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
@@ -136,7 +136,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Disabled
public void testInvalidIngestionProps() {
Exception e = assertThrows(Exception.class, () -> {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
TestDataSource.class.getName(), true, true, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config",
TestDataSource.class.getName(), true, true, null);
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Creation of execution object should fail without kafka topic");
LOG.debug("Creation of execution object failed with error: " +
e.getMessage(), e);
@@ -155,18 +155,18 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
JsonKafkaSource.class.getName(), false, false, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config",
JsonKafkaSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts =
streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
-
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source_uber.avsc");
-
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target_uber.avsc");
+
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source_uber.avsc");
+
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target_uber.avsc");
properties.setProperty("hoodie.datasource.write.partitionpath.field",
"timestamp");
properties.setProperty("hoodie.deltastreamer.source.kafka.topic",
topicName2);
executionContexts.get(1).setProperties(properties);
TypedProperties properties1 = executionContexts.get(0).getProperties();
-
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source_short_trip_uber.avsc");
-
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target_short_trip_uber.avsc");
+
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source_short_trip_uber.avsc");
+
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target_short_trip_uber.avsc");
properties1.setProperty("hoodie.datasource.write.partitionpath.field",
"timestamp");
properties1.setProperty("hoodie.deltastreamer.source.kafka.topic",
topicName1);
executionContexts.get(0).setProperties(properties1);
@@ -198,15 +198,15 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testMultiTableExecutionWithParquetSource() throws IOException {
// ingest test data to 2 parquet source paths
- String parquetSourceRoot1 = dfsBasePath + "/parquetSrcPath1/";
+ String parquetSourceRoot1 = basePath + "/parquetSrcPath1/";
prepareParquetDFSFiles(10, parquetSourceRoot1);
- String parquetSourceRoot2 = dfsBasePath + "/parquetSrcPath2/";
+ String parquetSourceRoot2 = basePath + "/parquetSrcPath2/";
prepareParquetDFSFiles(5, parquetSourceRoot2);
// add only common props. later we can add per table props
String parquetPropsFile = populateCommonPropsAndWriteToFile();
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config",
ParquetDFSSource.class.getName(), false, false,
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(parquetPropsFile, basePath + "/config",
ParquetDFSSource.class.getName(), false, false,
false, "multi_table_parquet", null);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
@@ -237,7 +237,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
@Test
public void testTableLevelProperties() throws IOException {
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
TestDataSource.class.getName(), false, false, null);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config",
TestDataSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> tableExecutionContexts =
streamer.getTableExecutionContexts();
tableExecutionContexts.forEach(tableExecutionContext -> {
@@ -255,8 +255,8 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
private String populateCommonPropsAndWriteToFile() throws IOException {
TypedProperties commonProps = new TypedProperties();
- populateCommonProps(commonProps, dfsBasePath);
- UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath +
"/" + PROPS_FILENAME_TEST_PARQUET);
+ populateCommonProps(commonProps, basePath);
+ UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, fs, basePath + "/" +
PROPS_FILENAME_TEST_PARQUET);
return PROPS_FILENAME_TEST_PARQUET;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 37abaa56b1..1cda910b70 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -34,7 +34,7 @@ public class TestAvroDFSSource extends
AbstractDFSSourceTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- this.dfsRoot = dfsBasePath + "/avroFiles";
+ this.dfsRoot = basePath + "/avroFiles";
this.fileSuffix = ".avro";
}
@@ -53,4 +53,4 @@ public class TestAvroDFSSource extends
AbstractDFSSourceTestBase {
protected void writeNewDataToFile(List<HoodieRecord> records, Path path)
throws IOException {
Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
index 7b8eead14f..8ffa3173e1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -38,7 +38,7 @@ public class TestCsvDFSSource extends
AbstractDFSSourceTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- this.dfsRoot = dfsBasePath + "/jsonFiles";
+ this.dfsRoot = basePath + "/jsonFiles";
this.fileSuffix = ".json";
this.useFlattenedSchema = true;
this.schemaProvider = new FilebasedSchemaProvider(
@@ -57,6 +57,6 @@ public class TestCsvDFSSource extends
AbstractDFSSourceTestBase {
@Override
public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws
IOException {
UtilitiesTestBase.Helpers.saveCsvToDFS(
- true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString());
+ true, '\t', Helpers.jsonifyRecords(records), fs, path.toString());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
index 1ac3f91fa8..802a338b5c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
@@ -58,7 +58,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
@BeforeAll
public static void beforeAll() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices();
}
@BeforeEach
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index 76c1c50b09..fde10b2d9a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -37,7 +37,7 @@ public class TestJsonDFSSource extends
AbstractDFSSourceTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- this.dfsRoot = dfsBasePath + "/jsonFiles";
+ this.dfsRoot = basePath + "/jsonFiles";
this.fileSuffix = ".json";
}
@@ -51,6 +51,6 @@ public class TestJsonDFSSource extends
AbstractDFSSourceTestBase {
@Override
public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws
IOException {
UtilitiesTestBase.Helpers.saveStringsToDFS(
- Helpers.jsonifyRecords(records), dfs, path.toString());
+ Helpers.jsonifyRecords(records), fs, path.toString());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index 5ad590a82f..44489037e8 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -36,7 +36,7 @@ public class TestParquetDFSSource extends
AbstractDFSSourceTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- this.dfsRoot = dfsBasePath + "/parquetFiles";
+ this.dfsRoot = basePath + "/parquetFiles";
this.fileSuffix = ".parquet";
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
index 3d89dc2bc9..55c9a6b86f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
@@ -47,9 +47,9 @@ public class TestS3EventsSource extends
AbstractCloudObjectsSourceTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- this.dfsRoot = dfsBasePath + "/parquetFiles";
+ this.dfsRoot = basePath + "/parquetFiles";
this.fileSuffix = ".parquet";
- dfs.mkdirs(new Path(dfsRoot));
+ fs.mkdirs(new Path(dfsRoot));
}
@AfterEach
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index f2b3b1df94..35bc7884d7 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -18,8 +18,6 @@
package org.apache.hudi.utilities.sources;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -27,6 +25,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -59,7 +60,7 @@ public class TestSqlSource extends UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices();
}
@AfterAll
@@ -69,8 +70,8 @@ public class TestSqlSource extends UtilitiesTestBase {
@BeforeEach
public void setup() throws Exception {
- dfsRoot = UtilitiesTestBase.dfsBasePath + "/parquetFiles";
- UtilitiesTestBase.dfs.mkdirs(new Path(dfsRoot));
+ dfsRoot = UtilitiesTestBase.basePath + "/parquetFiles";
+ UtilitiesTestBase.fs.mkdirs(new Path(dfsRoot));
props = new TypedProperties();
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(),
jsc);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index e3bf39ad7d..2ac1b8b0bf 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -64,7 +64,7 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices();
}
@AfterAll
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index ff7d6cc2ed..534e14cc73 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -52,9 +52,9 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hive.service.server.HiveServer2;
import org.apache.log4j.Level;
@@ -74,6 +74,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
import java.io.BufferedReader;
import java.io.FileInputStream;
@@ -102,19 +103,22 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
*/
public class UtilitiesTestBase {
- protected static String dfsBasePath;
+ @TempDir
+ protected static java.nio.file.Path sharedTempDir;
+ protected static FileSystem fs;
+ protected static String basePath;
protected static HdfsTestService hdfsTestService;
protected static MiniDFSCluster dfsCluster;
- protected static DistributedFileSystem dfs;
- protected transient JavaSparkContext jsc = null;
- protected transient HoodieSparkEngineContext context = null;
- protected transient SparkSession sparkSession = null;
- protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer;
protected static HiveTestService hiveTestService;
protected static ZookeeperTestService zookeeperTestService;
private static final ObjectMapper MAPPER = new ObjectMapper();
+ protected transient JavaSparkContext jsc;
+ protected transient HoodieSparkEngineContext context;
+ protected transient SparkSession sparkSession;
+ protected transient SQLContext sqlContext;
+
@BeforeAll
public static void setLogLevel() {
Logger rootLogger = Logger.getRootLogger();
@@ -122,22 +126,33 @@ public class UtilitiesTestBase {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
}
- public static void initTestServices(boolean needsHive, boolean
needsZookeeper) throws Exception {
+ public static void initTestServices() throws Exception {
+ initTestServices(false, false, false);
+ }
- if (hdfsTestService == null) {
- hdfsTestService = new HdfsTestService();
+ public static void initTestServices(boolean needsHdfs, boolean needsHive,
boolean needsZookeeper) throws Exception {
+ final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+ hadoopConf.set("hive.exec.scratchdir", System.getenv("java.io.tmpdir") +
"/hive");
+
+ if (needsHdfs) {
+ hdfsTestService = new HdfsTestService(hadoopConf);
dfsCluster = hdfsTestService.start(true);
- dfs = dfsCluster.getFileSystem();
- dfsBasePath = dfs.getWorkingDirectory().toString();
- dfs.mkdirs(new Path(dfsBasePath));
+ fs = dfsCluster.getFileSystem();
+ basePath = fs.getWorkingDirectory().toString();
+ fs.mkdirs(new Path(basePath));
+ } else {
+ fs = FileSystem.getLocal(hadoopConf);
+ basePath = sharedTempDir.toUri().toString();
}
+
if (needsHive) {
- hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
+ hiveTestService = new HiveTestService(hadoopConf);
hiveServer = hiveTestService.start();
- clearHiveDb();
+ clearHiveDb(basePath + "/dummy" + System.currentTimeMillis());
}
+
if (needsZookeeper) {
- zookeeperTestService = new
ZookeeperTestService(hdfsTestService.getHadoopConf());
+ zookeeperTestService = new ZookeeperTestService(hadoopConf);
zookeeperTestService.start();
}
}
@@ -213,14 +228,14 @@ public class UtilitiesTestBase {
*
* @throws IOException
*/
- private static void clearHiveDb() throws Exception {
+ private static void clearHiveDb(String tempWriteablePath) throws Exception {
// Create Dummy hive sync config
- HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
+ HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tempWriteablePath,
"dummy");
hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf());
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.getString(META_SYNC_TABLE_NAME))
- .initTable(dfs.getConf(), hiveSyncConfig.getString(META_SYNC_BASE_PATH));
+ .initTable(fs.getConf(), hiveSyncConfig.getString(META_SYNC_BASE_PATH));
QueryBasedDDLExecutor ddlExecutor = new JDBCExecutor(hiveSyncConfig);
ddlExecutor.runSQL("drop database if exists " +
hiveSyncConfig.getString(META_SYNC_DATABASE_NAME));
@@ -375,16 +390,16 @@ public class UtilitiesTestBase {
}
public static TypedProperties setupSchemaOnDFS(String scope, String
filename) throws IOException {
- UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, dfs,
dfsBasePath + "/" + filename);
+ UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, fs, basePath
+ "/" + filename);
TypedProperties props = new TypedProperties();
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/" + filename);
+
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + filename);
return props;
}
public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String
scope, String filename) throws IOException {
- UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" +
filename, dfs, dfsBasePath + "/" + filename);
+ UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" +
filename, fs, basePath + "/" + filename);
TypedProperties props = new TypedProperties();
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/" + filename);
+
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + filename);
return props;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
index 510843b170..d4593f64fd 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -54,7 +54,7 @@ public abstract class AbstractCloudObjectsSourceTestBase
extends UtilitiesTestBa
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices();
}
@AfterAll
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index ea218f53be..d49d197fd5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -62,7 +62,7 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices(true, false, false);
}
@AfterAll
@@ -117,7 +117,7 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
*/
@Test
public void testReadingFromSource() throws IOException {
- dfs.mkdirs(new Path(dfsRoot));
+ fs.mkdirs(new Path(dfsRoot));
SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(prepareDFSSource());
// 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
@@ -125,7 +125,7 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
// Test respecting sourceLimit
int sourceLimit = 10;
- RemoteIterator<LocatedFileStatus> files =
dfs.listFiles(generateOneFile("1", "000", 100), true);
+ RemoteIterator<LocatedFileStatus> files =
fs.listFiles(generateOneFile("1", "000", 100), true);
FileStatus file1Status = files.next();
assertTrue(file1Status.getLen() > sourceLimit);
assertEquals(Option.empty(),
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 3e1c5a92b1..74dafbd2ff 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -49,19 +50,19 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, false);
+ UtilitiesTestBase.initTestServices();
UtilitiesTestBase.Helpers.copyToDFS(
"delta-streamer-config/sql-file-transformer.sql",
- UtilitiesTestBase.dfs,
- UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+ UtilitiesTestBase.fs,
+ UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
UtilitiesTestBase.Helpers.copyToDFS(
"delta-streamer-config/sql-file-transformer-invalid.sql",
- UtilitiesTestBase.dfs,
- UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+ UtilitiesTestBase.fs,
+ UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
UtilitiesTestBase.Helpers.copyToDFS(
"delta-streamer-config/sql-file-transformer-empty.sql",
- UtilitiesTestBase.dfs,
- UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+ UtilitiesTestBase.fs,
+ UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
}
@AfterAll
@@ -98,7 +99,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the class throws hoodie IO exception correctly when given a
incorrect config.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
- UtilitiesTestBase.dfsBasePath + "/non-exist-sql-file.sql");
+ UtilitiesTestBase.basePath + "/non-exist-sql-file.sql");
assertThrows(
HoodieIOException.class,
() -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows,
props));
@@ -109,7 +110,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the SQL file based transformer works as expected for the
invalid SQL statements.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
- UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+ UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
assertThrows(
ParseException.class,
() -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows,
props));
@@ -120,7 +121,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the SQL file based transformer works as expected for the empty
SQL statements.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
- UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+ UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession,
inputDatasetRows, props);
String[] actualRows =
emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
String[] expectedRows = emptyDatasetRow.collectAsList().toArray(new
String[0]);
@@ -132,7 +133,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the SQL file based transformer works as expected for the
correct input.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
- UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+ UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
Dataset<Row> transformedRow =
sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);