This is an automated email from the ASF dual-hosted git repository. bhavanisudha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
commit 1f6be820f327503b460f9e8c372c07d6b0918fca Author: Satish Kotha <[email protected]> AuthorDate: Thu Apr 2 11:05:54 2020 -0700 [HUDI-758] Modify Integration test to include incremental queries for MOR tables --- ...n_commit_time.sh => get_min_commit_time_cow.sh} | 0 ...n_commit_time.sh => get_min_commit_time_mor.sh} | 2 +- ...ntal.commands => hive-incremental-cow.commands} | 2 +- ...l.commands => hive-incremental-mor-ro.commands} | 8 ++-- ...l.commands => hive-incremental-mor-rt.commands} | 8 ++-- .../org/apache/hudi/integ/ITTestHoodieDemo.java | 45 +++++++++++++++------- 6 files changed, 41 insertions(+), 24 deletions(-) diff --git a/docker/demo/get_min_commit_time.sh b/docker/demo/get_min_commit_time_cow.sh similarity index 100% copy from docker/demo/get_min_commit_time.sh copy to docker/demo/get_min_commit_time_cow.sh diff --git a/docker/demo/get_min_commit_time.sh b/docker/demo/get_min_commit_time_mor.sh similarity index 90% rename from docker/demo/get_min_commit_time.sh rename to docker/demo/get_min_commit_time_mor.sh index 264793f..190ed97 100755 --- a/docker/demo/get_min_commit_time.sh +++ b/docker/demo/get_min_commit_time_mor.sh @@ -16,5 +16,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_cow/.hoodie/*.commit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '` +MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_mor/.hoodie/*.deltacommit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '` echo $MIN_COMMIT_TIME; diff --git a/docker/demo/hive-incremental.commands b/docker/demo/hive-incremental-cow.commands similarity index 93% copy from docker/demo/hive-incremental.commands copy to docker/demo/hive-incremental-cow.commands index 9b52c3d..7f43548 100644 --- a/docker/demo/hive-incremental.commands +++ b/docker/demo/hive-incremental-cow.commands @@ -19,7 +19,7 @@ add jar ${hudi.hadoop.bundle}; set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL; set hoodie.stock_ticks_cow.consume.max.commits=3; -set hoodie.stock_ticks_cow.consume.start.timestamp=${min.commit.time}; +set hoodie.stock_ticks_cow.consume.start.timestamp='${min.commit.time}'; select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; diff --git a/docker/demo/hive-incremental.commands b/docker/demo/hive-incremental-mor-ro.commands similarity index 72% copy from docker/demo/hive-incremental.commands copy to docker/demo/hive-incremental-mor-ro.commands index 9b52c3d..8b97c0a 100644 --- a/docker/demo/hive-incremental.commands +++ b/docker/demo/hive-incremental-mor-ro.commands @@ -17,11 +17,11 @@ add jar ${hudi.hadoop.bundle}; -set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL; -set hoodie.stock_ticks_cow.consume.max.commits=3; -set hoodie.stock_ticks_cow.consume.start.timestamp=${min.commit.time}; +set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL; +set hoodie.stock_ticks_mor.consume.max.commits=3; +set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}'; -select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; +select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; !quit diff --git a/docker/demo/hive-incremental.commands b/docker/demo/hive-incremental-mor-rt.commands similarity index 72% rename from docker/demo/hive-incremental.commands rename to docker/demo/hive-incremental-mor-rt.commands index 9b52c3d..a81fb77 100644 --- a/docker/demo/hive-incremental.commands +++ b/docker/demo/hive-incremental-mor-rt.commands @@ -17,11 +17,11 @@ add jar ${hudi.hadoop.bundle}; -set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL; -set hoodie.stock_ticks_cow.consume.max.commits=3; -set hoodie.stock_ticks_cow.consume.start.timestamp=${min.commit.time}; +set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL; +set hoodie.stock_ticks_mor.consume.max.commits=3; +set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}'; -select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; +select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; !quit diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 01eecd0..d890031 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -53,7 +53,8 @@ public class ITTestHoodieDemo extends ITTestBase { private static final String MOR_TABLE_NAME = "stock_ticks_mor"; private static final String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh"; - private static final String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh"; + private static final String MIN_COMMIT_TIME_COW_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time_cow.sh"; + private static final String MIN_COMMIT_TIME_MOR_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time_mor.sh"; private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh"; private static final String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands"; private static final String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands"; @@ -62,7 +63,9 @@ public class ITTestHoodieDemo extends ITTestBase { private static final String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands"; private static final String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands"; private static final String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands"; - private static final String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands"; + private static final String HIVE_INCREMENTAL_COW_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-cow.commands"; + private static final String HIVE_INCREMENTAL_MOR_RO_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-ro.commands"; + private static final String HIVE_INCREMENTAL_MOR_RT_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-rt.commands"; private static String HIVE_SYNC_CMD_FMT = " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 " @@ -87,7 +90,7 @@ public class ITTestHoodieDemo extends ITTestBase { testHiveAfterSecondBatch(); testPrestoAfterSecondBatch(); testSparkSQLAfterSecondBatch(); - testIncrementalHiveQuery(); + testIncrementalHiveQueryBeforeCompaction(); testIncrementalSparkSQLQuery(); // compaction @@ -267,23 +270,37 @@ public class ITTestHoodieDemo extends ITTestBase { assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|"); } - private void testIncrementalHiveQuery() throws Exception { + private void testIncrementalHiveQuery(String minCommitTimeScript, String incrementalCommandsFile, + String expectedOutput, int expectedTimes) throws Exception { String minCommitTime = - executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true).getStdout().toString(); + executeCommandStringInDocker(ADHOC_2_CONTAINER, minCommitTimeScript, true).getStdout().toString(); Pair<String, String> stdOutErrPair = - executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS, "min.commit.time=" + minCommitTime + "`"); - assertStdOutContains(stdOutErrPair, "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"); + executeHiveCommandFile(incrementalCommandsFile, "min.commit.time=" + minCommitTime + "`"); + assertStdOutContains(stdOutErrPair, expectedOutput, expectedTimes); + } + + private void testIncrementalHiveQueryBeforeCompaction() throws Exception { + String expectedOutputCOW = "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"; + + // verify that 10:59 is present in COW table because there is no compaction process for COW + testIncrementalHiveQuery(MIN_COMMIT_TIME_COW_SCRIPT, HIVE_INCREMENTAL_COW_COMMANDS, expectedOutputCOW, 1); + + // verify that 10:59 is NOT present in RO table because of pending compaction + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RO_COMMANDS, expectedOutputCOW, 0); + + // verify that 10:59 is present in RT table even with pending compaction + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RT_COMMANDS, expectedOutputCOW, 1); } private void testIncrementalHiveQueryAfterCompaction() throws Exception { - String minCommitTime = - executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true).getStdout().toString(); - Pair<String, String> stdOutErrPair = - executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS, "min.commit.time=" + minCommitTime + "`"); - assertStdOutContains(stdOutErrPair, - "| symbol | ts | volume | open | close |\n" + String expectedOutput = "| symbol | ts | volume | open | close |\n" + "+---------+----------------------+---------+------------+-----------+\n" - + "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"); + + "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"; + + // verify that 10:59 is present for all views because compaction is complete + testIncrementalHiveQuery(MIN_COMMIT_TIME_COW_SCRIPT, HIVE_INCREMENTAL_COW_COMMANDS, expectedOutput, 1); + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RO_COMMANDS, expectedOutput, 1); + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RT_COMMANDS, expectedOutput, 1); } private void testIncrementalSparkSQLQuery() throws Exception {
