This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5cf2f26 [HUDI-1618] Fixing NPE with Parquet src in multi table delta
streamer (#2577)
5cf2f26 is described below
commit 5cf2f2618b6a59a831543b588fb3bb85bdf5f1e8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Mar 7 16:40:40 2021 -0500
[HUDI-1618] Fixing NPE with Parquet src in multi table delta streamer
(#2577)
---
.../HoodieMultiTableDeltaStreamer.java | 7 +-
.../functional/TestHoodieDeltaStreamer.java | 44 +++++++---
.../TestHoodieMultiTableDeltaStreamer.java | 95 +++++++++++++++++++++-
3 files changed, 127 insertions(+), 19 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 9d5ca3c..be2fe54 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
/**
@@ -75,9 +76,9 @@ public class HoodieMultiTableDeltaStreamer {
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ?
configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
- TypedProperties properties = UtilHelpers.readConfig(fs, new
Path(commonPropsFile), new ArrayList<>()).getConfig();
+ TypedProperties commonProperties = UtilHelpers.readConfig(fs, new
Path(commonPropsFile), new ArrayList<>()).getConfig();
//get the tables to be ingested and their corresponding config files from
this properties instance
- populateTableExecutionContextList(properties, configFolder, fs, config);
+ populateTableExecutionContextList(commonProperties, configFolder, fs,
config);
}
private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile,
String configFolder, FileSystem fs) throws IOException {
@@ -147,7 +148,7 @@ public class HoodieMultiTableDeltaStreamer {
}
private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg,
TypedProperties typedProperties) {
- if
(cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
+ if (Objects.equals(cfg.schemaProviderClassName,
SchemaRegistryProvider.class.getName())) {
String schemaRegistryBaseUrl =
typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix =
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP,
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) +
schemaRegistrySuffix);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 7fb5b18..7522c2d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -118,8 +118,9 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
public static final String PROPS_INVALID_TABLE_CONFIG_FILE =
"test-invalid-table-config.properties";
private static final String PROPS_FILENAME_TEST_INVALID =
"test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV =
"test-csv-dfs-source.properties";
- private static final String PROPS_FILENAME_TEST_PARQUET =
"test-parquet-dfs-source.properties";
+ protected static final String PROPS_FILENAME_TEST_PARQUET =
"test-parquet-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_JSON_KAFKA =
"test-json-kafka-dfs-source.properties";
+ private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
private static String PARQUET_SOURCE_ROOT;
private static String JSON_KAFKA_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
@@ -214,7 +215,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath +
"/" + PROPS_FILENAME_TEST_INVALID);
TypedProperties props1 = new TypedProperties();
- populateCommonProps(props1);
+ populateAllCommonProps(props1);
UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE1);
TypedProperties properties = new TypedProperties();
@@ -226,7 +227,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs,
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
- prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
+ prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
}
private static void populateInvalidTableConfigFilePathProps(TypedProperties
props) {
@@ -236,20 +237,30 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_uber_config.properties");
}
- private static void populateCommonProps(TypedProperties props) {
+ private static void populateAllCommonProps(TypedProperties props) {
+ populateCommonProps(props);
+ populateCommonKafkaProps(props);
+ populateCommonHiveProps(props);
+ }
+
+ protected static void populateCommonProps(TypedProperties props) {
props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/uber_config.properties");
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
dfsBasePath + "/config/short_trip_uber_config.properties");
+ }
+ protected static void populateCommonKafkaProps(TypedProperties props) {
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets",
"earliest");
props.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
String.valueOf(5000));
+ }
+ protected static void populateCommonHiveProps(TypedProperties props) {
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(),
"jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(),
"testdb2");
@@ -975,12 +986,16 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
private static void prepareParquetDFSFiles(int numRecords) throws
IOException {
- prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
+ prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
}
- private static void prepareParquetDFSFiles(int numRecords, String fileName,
boolean useCustomSchema,
+ protected static void prepareParquetDFSFiles(int numRecords, String
baseParquetPath) throws IOException {
+ prepareParquetDFSFiles(numRecords, baseParquetPath,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ }
+
+ protected static void prepareParquetDFSFiles(int numRecords, String
baseParquetPath, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
- String path = PARQUET_SOURCE_ROOT + "/" + fileName;
+ String path = baseParquetPath + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
@@ -1006,13 +1021,18 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean
hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc",
- PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
+ PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false);
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
- String propsFileName, String parquetSourceRoot) throws IOException {
+ String propsFileName, String parquetSourceRoot, boolean addCommonProps)
throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
+
+ if (addCommonProps) {
+ populateCommonProps(parquetProps);
+ }
+
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server","false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
@@ -1042,7 +1062,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
private void prepareJsonKafkaDFSSource(String propsFileName, String
autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
- populateCommonProps(props);
+ populateAllCommonProps(props);
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server","false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -1065,10 +1085,10 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
// prep parquet source
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
int parquetRecords = 10;
- prepareParquetDFSFiles(parquetRecords,"1.parquet", true,
HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+ prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA,
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false,"source_uber.avsc",
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
- PARQUET_SOURCE_ROOT);
+ PARQUET_SOURCE_ROOT, false);
// delta streamer w/ parquest source
String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index ad1b753..7b5ce9d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -26,7 +26,9 @@ import
org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -34,7 +36,9 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -43,19 +47,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer
{
private static volatile Logger log =
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
+ private static final Random RANDOM = new Random();
static class TestHelpers {
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName,
String configFolder, String sourceClassName, boolean enableHiveSync) {
+ return getConfig(fileName, configFolder, sourceClassName,
enableHiveSync, true, "multi_table_dataset");
+ }
+
+ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName,
String configFolder, String sourceClassName, boolean enableHiveSync,
+ boolean setSchemaProvider, String basePathPrefix) {
HoodieMultiTableDeltaStreamer.Config config = new
HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
config.targetTableName = "dummy_table";
- config.basePathPrefix = dfsBasePath + "/multi_table_dataset";
+ config.basePathPrefix = dfsBasePath + "/" + basePathPrefix;
config.propsFilePath = dfsBasePath + "/" + fileName;
config.tableType = "COPY_ON_WRITE";
config.sourceClassName = sourceClassName;
config.sourceOrderingField = "timestamp";
- config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
+ if (setSchemaProvider) {
+ config.schemaProviderClassName =
FilebasedSchemaProvider.class.getName();
+ }
config.enableHiveSync = enableHiveSync;
return config;
}
@@ -117,7 +129,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
TestHoodieDeltaStreamer {
}
@Test //0 corresponds to fg
- public void testMultiTableExecution() throws IOException {
+ public void testMultiTableExecutionWithKafkaSource() throws IOException {
//create topics for each table
String topicName1 = "topic" + testNum++;
String topicName2 = "topic" + testNum;
@@ -128,7 +140,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
TestHoodieDeltaStreamer {
testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
- HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config",
JsonKafkaSource.class.getName(), false);
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts =
streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
@@ -160,4 +172,79 @@ public class TestHoodieMultiTableDeltaStreamer extends
TestHoodieDeltaStreamer {
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2
+ "/*/*.parquet", sqlContext);
testNum++;
}
+
+ @Test
+ public void testMultiTableExecutionWithParquetSource() throws IOException {
+ // ingest test data to 2 parquet source paths
+ String parquetSourceRoot1 = dfsBasePath + "/parquetSrcPath1/";
+ prepareParquetDFSFiles(10, parquetSourceRoot1);
+ String parquetSourceRoot2 = dfsBasePath + "/parquetSrcPath2/";
+ prepareParquetDFSFiles(5, parquetSourceRoot2);
+
+ // add only common props. later we can add per table props
+ String parquetPropsFile = populateCommonPropsAndWriteToFile();
+
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config",
ParquetDFSSource.class.getName(), false,
+ false, "multi_table_parquet");
+ HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
+
+ List<TableExecutionContext> executionContexts =
streamer.getTableExecutionContexts();
+ // fetch per parquet source props and add per table properties
+ ingestPerParquetSourceProps(executionContexts, Arrays.asList(new String[]
{parquetSourceRoot1, parquetSourceRoot2}));
+
+ String targetBasePath1 =
executionContexts.get(0).getConfig().targetBasePath;
+ String targetBasePath2 =
executionContexts.get(1).getConfig().targetBasePath;
+
+ // sync and verify
+ syncAndVerify(streamer, targetBasePath1, targetBasePath2, 10, 5);
+
+ int totalTable1Records = 10;
+ int totalTable2Records = 5;
+ // ingest multiple rounds and verify
+ for (int i = 0; i < 3; i++) {
+ int table1Records = 10 + RANDOM.nextInt(100);
+ int table2Records = 15 + RANDOM.nextInt(100);
+ prepareParquetDFSFiles(table1Records, parquetSourceRoot1, (i + 2) +
".parquet", false, null, null);
+ prepareParquetDFSFiles(table2Records, parquetSourceRoot2, (i + 2) +
".parquet", false, null, null);
+ totalTable1Records += table1Records;
+ totalTable2Records += table2Records;
+ // sync and verify
+ syncAndVerify(streamer, targetBasePath1, targetBasePath2,
totalTable1Records, totalTable2Records);
+ }
+ }
+
+ private String populateCommonPropsAndWriteToFile() throws IOException {
+ TypedProperties commonProps = new TypedProperties();
+ populateCommonProps(commonProps);
+ UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath +
"/" + PROPS_FILENAME_TEST_PARQUET);
+ return PROPS_FILENAME_TEST_PARQUET;
+ }
+
+ private TypedProperties getParquetProps(String parquetSourceRoot) {
+ TypedProperties props = new TypedProperties();
+ props.setProperty("include", "base.properties");
+ props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+ props.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+ props.setProperty("hoodie.deltastreamer.source.dfs.root",
parquetSourceRoot);
+ return props;
+ }
+
+ private void ingestPerParquetSourceProps(List<TableExecutionContext>
executionContexts, List<String> parquetSourceRoots) {
+ int counter = 0;
+ for (String parquetSourceRoot : parquetSourceRoots) {
+ TypedProperties properties =
executionContexts.get(counter).getProperties();
+ TypedProperties parquetProps = getParquetProps(parquetSourceRoot);
+ parquetProps.forEach((k, v) -> {
+ properties.setProperty(k.toString(), v.toString());
+ });
+ executionContexts.get(counter).setProperties(properties);
+ counter++;
+ }
+ }
+
+ private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String
targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long
table2ExpectedRecords) {
+ streamer.sync();
+
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords,
targetBasePath1 + "/*/*.parquet", sqlContext);
+
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords,
targetBasePath2 + "/*/*.parquet", sqlContext);
+ }
}