This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cf2f26  [HUDI-1618] Fixing NPE with Parquet src in multi table delta 
streamer (#2577)
5cf2f26 is described below

commit 5cf2f2618b6a59a831543b588fb3bb85bdf5f1e8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Mar 7 16:40:40 2021 -0500

    [HUDI-1618] Fixing NPE with Parquet src in multi table delta streamer 
(#2577)
---
 .../HoodieMultiTableDeltaStreamer.java             |  7 +-
 .../functional/TestHoodieDeltaStreamer.java        | 44 +++++++---
 .../TestHoodieMultiTableDeltaStreamer.java         | 95 +++++++++++++++++++++-
 3 files changed, 127 insertions(+), 19 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 9d5ca3c..be2fe54 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -75,9 +76,9 @@ public class HoodieMultiTableDeltaStreamer {
     FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
     configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? 
configFolder.substring(0, configFolder.length() - 1) : configFolder;
     checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
-    TypedProperties properties = UtilHelpers.readConfig(fs, new 
Path(commonPropsFile), new ArrayList<>()).getConfig();
+    TypedProperties commonProperties = UtilHelpers.readConfig(fs, new 
Path(commonPropsFile), new ArrayList<>()).getConfig();
     //get the tables to be ingested and their corresponding config files from 
this properties instance
-    populateTableExecutionContextList(properties, configFolder, fs, config);
+    populateTableExecutionContextList(commonProperties, configFolder, fs, 
config);
   }
 
   private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile, 
String configFolder, FileSystem fs) throws IOException {
@@ -147,7 +148,7 @@ public class HoodieMultiTableDeltaStreamer {
   }
 
   private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, 
TypedProperties typedProperties) {
-    if 
(cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
+    if (Objects.equals(cfg.schemaProviderClassName, 
SchemaRegistryProvider.class.getName())) {
       String schemaRegistryBaseUrl = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
       String schemaRegistrySuffix = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
       typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
schemaRegistrySuffix);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 7fb5b18..7522c2d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -118,8 +118,9 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   public static final String PROPS_INVALID_TABLE_CONFIG_FILE = 
"test-invalid-table-config.properties";
   private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
   private static final String PROPS_FILENAME_TEST_CSV = 
"test-csv-dfs-source.properties";
-  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+  protected static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
   private static final String PROPS_FILENAME_TEST_JSON_KAFKA = 
"test-json-kafka-dfs-source.properties";
+  private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
   private static String PARQUET_SOURCE_ROOT;
   private static String JSON_KAFKA_SOURCE_ROOT;
   private static final int PARQUET_NUM_RECORDS = 5;
@@ -214,7 +215,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_INVALID);
 
     TypedProperties props1 = new TypedProperties();
-    populateCommonProps(props1);
+    populateAllCommonProps(props1);
     UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE1);
 
     TypedProperties properties = new TypedProperties();
@@ -226,7 +227,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, 
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
 
-    prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
+    prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
   }
 
   private static void populateInvalidTableConfigFilePathProps(TypedProperties 
props) {
@@ -236,20 +237,30 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_uber_config.properties");
   }
 
-  private static void populateCommonProps(TypedProperties props) {
+  private static void populateAllCommonProps(TypedProperties props) {
+    populateCommonProps(props);
+    populateCommonKafkaProps(props);
+    populateCommonHiveProps(props);
+  }
+
+  protected static void populateCommonProps(TypedProperties props) {
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
     
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
     props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
     
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/uber_config.properties");
     
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
 dfsBasePath + "/config/short_trip_uber_config.properties");
+  }
 
+  protected static void populateCommonKafkaProps(TypedProperties props) {
     //Kafka source properties
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
     props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", 
"earliest");
     props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
+  }
 
+  protected static void populateCommonHiveProps(TypedProperties props) {
     // Hive Configs
     props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), 
"jdbc:hive2://127.0.0.1:9999/");
     props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), 
"testdb2");
@@ -975,12 +986,16 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   }
 
   private static void prepareParquetDFSFiles(int numRecords) throws 
IOException {
-    prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
+    prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
   }
 
-  private static void prepareParquetDFSFiles(int numRecords, String fileName, 
boolean useCustomSchema,
+  protected static void prepareParquetDFSFiles(int numRecords, String 
baseParquetPath) throws IOException {
+    prepareParquetDFSFiles(numRecords, baseParquetPath, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+  }
+
+  protected static void prepareParquetDFSFiles(int numRecords, String 
baseParquetPath, String fileName, boolean useCustomSchema,
       String schemaStr, Schema schema) throws IOException {
-    String path = PARQUET_SOURCE_ROOT + "/" + fileName;
+    String path = baseParquetPath + "/" + fileName;
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     if (useCustomSchema) {
       Helpers.saveParquetToDFS(Helpers.toGenericRecords(
@@ -1006,13 +1021,18 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
   private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc",
-        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
+        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false);
   }
 
   private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
-      String propsFileName, String parquetSourceRoot) throws IOException {
+      String propsFileName, String parquetSourceRoot, boolean addCommonProps) 
throws IOException {
     // Properties used for testing delta-streamer with Parquet source
     TypedProperties parquetProps = new TypedProperties();
+
+    if (addCommonProps) {
+      populateCommonProps(parquetProps);
+    }
+
     parquetProps.setProperty("include", "base.properties");
     parquetProps.setProperty("hoodie.embed.timeline.server","false");
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
@@ -1042,7 +1062,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   private void prepareJsonKafkaDFSSource(String propsFileName, String 
autoResetValue, String topicName) throws IOException {
     // Properties used for testing delta-streamer with JsonKafka source
     TypedProperties props = new TypedProperties();
-    populateCommonProps(props);
+    populateAllCommonProps(props);
     props.setProperty("include", "base.properties");
     props.setProperty("hoodie.embed.timeline.server","false");
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -1065,10 +1085,10 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     // prep parquet source
     PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
     int parquetRecords = 10;
-    prepareParquetDFSFiles(parquetRecords,"1.parquet", true, 
HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+    prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
 
     prepareParquetDFSSource(true, false,"source_uber.avsc", 
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
-        PARQUET_SOURCE_ROOT);
+        PARQUET_SOURCE_ROOT, false);
     // delta streamer w/ parquest source
     String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index ad1b753..7b5ce9d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -26,7 +26,9 @@ import 
org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
 import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -34,7 +36,9 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -43,19 +47,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer 
{
 
   private static volatile Logger log = 
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
+  private static final Random RANDOM = new Random();
 
   static class TestHelpers {
 
     static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, 
String configFolder, String sourceClassName, boolean enableHiveSync) {
+      return getConfig(fileName, configFolder, sourceClassName, 
enableHiveSync, true, "multi_table_dataset");
+    }
+
+    static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, 
String configFolder, String sourceClassName, boolean enableHiveSync,
+        boolean setSchemaProvider, String basePathPrefix) {
       HoodieMultiTableDeltaStreamer.Config config = new 
HoodieMultiTableDeltaStreamer.Config();
       config.configFolder = configFolder;
       config.targetTableName = "dummy_table";
-      config.basePathPrefix = dfsBasePath + "/multi_table_dataset";
+      config.basePathPrefix = dfsBasePath + "/" + basePathPrefix;
       config.propsFilePath = dfsBasePath + "/" + fileName;
       config.tableType = "COPY_ON_WRITE";
       config.sourceClassName = sourceClassName;
       config.sourceOrderingField = "timestamp";
-      config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
+      if (setSchemaProvider) {
+        config.schemaProviderClassName = 
FilebasedSchemaProvider.class.getName();
+      }
       config.enableHiveSync = enableHiveSync;
       return config;
     }
@@ -117,7 +129,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
   }
 
   @Test //0 corresponds to fg
-  public void testMultiTableExecution() throws IOException {
+  public void testMultiTableExecutionWithKafkaSource() throws IOException {
     //create topics for each table
     String topicName1 = "topic" + testNum++;
     String topicName2 = "topic" + testNum;
@@ -128,7 +140,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
     testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
     testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
 
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
     List<TableExecutionContext> executionContexts = 
streamer.getTableExecutionContexts();
     TypedProperties properties = executionContexts.get(1).getProperties();
@@ -160,4 +172,79 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
     TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 
+ "/*/*.parquet", sqlContext);
     testNum++;
   }
+
+  @Test
+  public void testMultiTableExecutionWithParquetSource() throws IOException {
+    // ingest test data to 2 parquet source paths
+    String parquetSourceRoot1 = dfsBasePath + "/parquetSrcPath1/";
+    prepareParquetDFSFiles(10, parquetSourceRoot1);
+    String parquetSourceRoot2 = dfsBasePath + "/parquetSrcPath2/";
+    prepareParquetDFSFiles(5, parquetSourceRoot2);
+
+    // add only common props. later we can add per table props
+    String parquetPropsFile = populateCommonPropsAndWriteToFile();
+
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", 
ParquetDFSSource.class.getName(), false,
+        false, "multi_table_parquet");
+    HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
+
+    List<TableExecutionContext> executionContexts = 
streamer.getTableExecutionContexts();
+    // fetch per parquet source props and add per table properties
+    ingestPerParquetSourceProps(executionContexts, Arrays.asList(new String[] 
{parquetSourceRoot1, parquetSourceRoot2}));
+
+    String targetBasePath1 = 
executionContexts.get(0).getConfig().targetBasePath;
+    String targetBasePath2 = 
executionContexts.get(1).getConfig().targetBasePath;
+
+    // sync and verify
+    syncAndVerify(streamer, targetBasePath1, targetBasePath2, 10, 5);
+
+    int totalTable1Records = 10;
+    int totalTable2Records = 5;
+    // ingest multiple rounds and verify
+    for (int i = 0; i < 3; i++) {
+      int table1Records = 10 + RANDOM.nextInt(100);
+      int table2Records = 15 + RANDOM.nextInt(100);
+      prepareParquetDFSFiles(table1Records, parquetSourceRoot1, (i + 2) + 
".parquet", false, null, null);
+      prepareParquetDFSFiles(table2Records, parquetSourceRoot2, (i + 2) + 
".parquet", false, null, null);
+      totalTable1Records += table1Records;
+      totalTable2Records += table2Records;
+      // sync and verify
+      syncAndVerify(streamer, targetBasePath1, targetBasePath2, 
totalTable1Records, totalTable2Records);
+    }
+  }
+
+  private String populateCommonPropsAndWriteToFile() throws IOException {
+    TypedProperties commonProps = new TypedProperties();
+    populateCommonProps(commonProps);
+    UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_PARQUET);
+    return PROPS_FILENAME_TEST_PARQUET;
+  }
+
+  private TypedProperties getParquetProps(String parquetSourceRoot) {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("include", "base.properties");
+    props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
+    return props;
+  }
+
+  private void ingestPerParquetSourceProps(List<TableExecutionContext> 
executionContexts, List<String> parquetSourceRoots) {
+    int counter = 0;
+    for (String parquetSourceRoot : parquetSourceRoots) {
+      TypedProperties properties = 
executionContexts.get(counter).getProperties();
+      TypedProperties parquetProps = getParquetProps(parquetSourceRoot);
+      parquetProps.forEach((k, v) -> {
+        properties.setProperty(k.toString(), v.toString());
+      });
+      executionContexts.get(counter).setProperties(properties);
+      counter++;
+    }
+  }
+
+  private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String 
targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long 
table2ExpectedRecords) {
+    streamer.sync();
+    
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords, 
targetBasePath1 + "/*/*.parquet", sqlContext);
+    
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords, 
targetBasePath2 + "/*/*.parquet", sqlContext);
+  }
 }

Reply via email to