This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d28c0f  Enhance quickstart script for CI tests (#5508)
2d28c0f is described below

commit 2d28c0f8fce781e094dacd8d53b6299ee42c5979
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Jun 9 21:42:50 2020 -0700

    Enhance quickstart script for CI tests (#5508)
    
    - For all the Quickstarts, add shutdown hook once the instances are set up 
so that the hook can always be executed
    - Use TERM signal instead of KILL so that process can execute the shutdown 
hook
    - Wait for process to fully terminate before starting the next one
    - Correctly handle query response before the table being ready
---
 .github/workflows/scripts/.pinot_quickstart.sh     | 83 +++++++++++----------
 .travis.yml                                        |  3 +-
 .travis/.travis_quickstart.sh                      | 84 ++++++++++++----------
 .../org/apache/pinot/tools/HybridQuickstart.java   | 57 +++++++--------
 .../java/org/apache/pinot/tools/Quickstart.java    | 22 +++---
 .../org/apache/pinot/tools/RealtimeQuickStart.java | 35 ++++-----
 6 files changed, 145 insertions(+), 139 deletions(-)

diff --git a/.github/workflows/scripts/.pinot_quickstart.sh 
b/.github/workflows/scripts/.pinot_quickstart.sh
index 737bdec..faa5d04 100755
--- a/.github/workflows/scripts/.pinot_quickstart.sh
+++ b/.github/workflows/scripts/.pinot_quickstart.sh
@@ -51,21 +51,23 @@ if [ "${PASS}" != 0 ]; then
 fi
 
 # Quickstart
-DIST_BIN_DIR=$(ls -d pinot-distribution/target/apache-pinot-*/apache-pinot-*)
-cd "${DIST_BIN_DIR}" || exit
+DIST_BIN_DIR=`ls -d pinot-distribution/target/apache-pinot-*/apache-pinot-*`
+cd "${DIST_BIN_DIR}"
 
 # Test quick-start-batch
 bin/quick-start-batch.sh &
 PID=$!
 
 PASS=0
+
+# Wait for 30 seconds for table to be set up, then at most 5 minutes to reach 
the desired state
 sleep 30
-for i in $(seq 1 200)
+for i in $(seq 1 150)
 do
-  curl -X POST --header 'Accept: application/json'  -d '{"sql":"select 
count(*) from baseballStats limit 1","trace":false}' 
http://localhost:8000/query/sql
-  COUNT_STAR_RES=$(curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from baseballStats limit 1","trace":false}' 
http://localhost:8000/query/sql | jq '.resultTable.rows[0][0]')
-  if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]]; then
-    if [ "${COUNT_STAR_RES}" -eq 97889 ]; then
+  QUERY_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from baseballStats limit 1","trace":false}' 
http://localhost:8000/query/sql`
+  if [ $? -eq 0 ]; then
+    COUNT_STAR_RES=`echo "${QUERY_RES}" | jq '.resultTable.rows[0][0]'`
+    if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]] && [ "${COUNT_STAR_RES}" -eq 
97889 ]; then
       PASS=1
       break
     fi
@@ -73,41 +75,52 @@ do
   sleep 2
 done
 
+cleanup () {
+  # Terminate the process and wait for the clean up to be done
+  kill "$1"
+  while true;
+  do
+    kill -0 "$1" && sleep 1 || break
+  done
+
+  # Delete ZK directory
+  rm -rf '/tmp/PinotAdmin/zkData'
+}
+
+cleanup "${PID}"
 if [ "${PASS}" -eq 0 ]; then
   echo 'Batch Quickstart failed: Cannot get correct result for count star 
query.'
   exit 1
 fi
 
-kill -9 $PID
-rm -rf /tmp/PinotAdmin/zkData
-
 # Test quick-start-streaming
 bin/quick-start-streaming.sh &
 PID=$!
 
 PASS=0
 RES_1=0
-sleep 30
 
-for i in $(seq 1 200)
+# Wait for 30 seconds for table to be set up, then at most 5 minutes to reach 
the desired state
+sleep 30
+for i in $(seq 1 150)
 do
-  curl -X POST --header 'Accept: application/json'  -d '{"sql":"select 
count(*) from meetupRsvp limit 1","trace":false}' 
http://localhost:8000/query/sql
-  COUNT_STAR_RES=$(curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from meetupRsvp limit 1","trace":false}' 
http://localhost:8000/query/sql | jq '.resultTable.rows[0][0]')
- if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]]; then
-    if [ "${COUNT_STAR_RES}" -gt 0 ]; then
+  QUERY_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from meetupRsvp limit 1","trace":false}' 
http://localhost:8000/query/sql`
+  if [ $? -eq 0 ]; then
+    COUNT_STAR_RES=`echo "${QUERY_RES}" | jq '.resultTable.rows[0][0]'`
+    if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]] && [ "${COUNT_STAR_RES}" -gt 0 ]; 
then
       if [ "${RES_1}" -eq 0 ]; then
-        RES_1=${COUNT_STAR_RES}
+        RES_1="${COUNT_STAR_RES}"
         continue
+      elif [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
+        PASS=1
+        break
       fi
     fi
-    if [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
-      PASS=1
-      break
-    fi
   fi
   sleep 2
 done
 
+cleanup "${PID}"
 if [ "${PASS}" -eq 0 ]; then
   if [ "${RES_1}" -eq 0 ]; then
     echo 'Streaming Quickstart test failed: Cannot get correct result for 
count star query.'
@@ -117,9 +130,6 @@ if [ "${PASS}" -eq 0 ]; then
   exit 1
 fi
 
-kill -9 $PID
-rm -rf /tmp/PinotAdmin/zkData
-
 # Test quick-start-hybrid
 cd bin
 ./quick-start-hybrid.sh &
@@ -127,26 +137,28 @@ PID=$!
 
 PASS=0
 RES_1=0
+
+# Wait for 30 seconds for table to be set up, then at most 5 minutes to reach 
the desired state
 sleep 30
-for i in $(seq 1 200)
+for i in $(seq 1 150)
 do
-  curl -X POST --header 'Accept: application/json'  -d '{"sql":"select 
count(*) from airlineStats limit 1","trace":false}' 
http://localhost:8000/query/sql
-  COUNT_STAR_RES=$(curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from airlineStats limit 1","trace":false}' 
http://localhost:8000/query/sql | jq '.resultTable.rows[0][0]')
-  if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]]; then
-    if [ "${COUNT_STAR_RES}" -gt 0 ]; then
+  QUERY_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from airlineStats limit 1","trace":false}' 
http://localhost:8000/query/sql`
+  if [ $? -eq 0 ]; then
+    COUNT_STAR_RES=`echo "${QUERY_RES}" | jq '.resultTable.rows[0][0]'`
+    if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]] && [ "${COUNT_STAR_RES}" -gt 0 ]; 
then
       if [ "${RES_1}" -eq 0 ]; then
-        RES_1=${COUNT_STAR_RES}
+        RES_1="${COUNT_STAR_RES}"
         continue
+      elif [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
+        PASS=1
+        break
       fi
     fi
-    if [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
-      PASS=1
-      break
-    fi
   fi
   sleep 2
 done
 
+cleanup "${PID}"
 if [ "${PASS}" -eq 0 ]; then
   if [ "${RES_1}" -eq 0 ]; then
     echo 'Hybrid Quickstart test failed: Cannot get correct result for count 
star query.'
@@ -156,9 +168,6 @@ if [ "${PASS}" -eq 0 ]; then
   exit 1
 fi
 
-kill -9 $PID
-rm -rf /tmp/PinotAdmin/zkData
-
 cd ../../../../../
 pwd
 mvn clean > /dev/null
diff --git a/.travis.yml b/.travis.yml
index 2315bc6..da44855 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,5 @@
 language: java
-os:
-  - linux
+os: linux
 dist: trusty
 
 before_install:
diff --git a/.travis/.travis_quickstart.sh b/.travis/.travis_quickstart.sh
index 121eefa..3a77b4b 100755
--- a/.travis/.travis_quickstart.sh
+++ b/.travis/.travis_quickstart.sh
@@ -30,60 +30,75 @@ java -version
 
 # Quickstart
 DIST_BIN_DIR=`ls -d pinot-distribution/target/apache-pinot-*/apache-pinot-*`
-cd ${DIST_BIN_DIR}
+cd "${DIST_BIN_DIR}"
 
 # Test quick-start-batch
 bin/quick-start-batch.sh &
 PID=$!
 
 PASS=0
+
+# Wait for 30 seconds for table to be set up, then at most 5 minutes to reach 
the desired state
 sleep 30
-for i in $(seq 1 200)
+for i in $(seq 1 150)
 do
-  COUNT_STAR_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from baseballStats limit 1","trace":false}' 
http://localhost:8000/query/sql | jq '.resultTable.rows[0][0]'`
-  if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]]; then
-    if [ "${COUNT_STAR_RES}" -eq 97889 ]; then
+  QUERY_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from baseballStats limit 1","trace":false}' 
http://localhost:8000/query/sql`
+  if [ $? -eq 0 ]; then
+    COUNT_STAR_RES=`echo "${QUERY_RES}" | jq '.resultTable.rows[0][0]'`
+    if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]] && [ "${COUNT_STAR_RES}" -eq 
97889 ]; then
       PASS=1
       break
     fi
   fi
-  sleep 1
+  sleep 2
 done
 
+cleanup () {
+  # Terminate the process and wait for the clean up to be done
+  kill "$1"
+  while true;
+  do
+    kill -0 "$1" && sleep 1 || break
+  done
+
+  # Delete ZK directory
+  rm -rf '/tmp/PinotAdmin/zkData'
+}
+
+cleanup "${PID}"
 if [ "${PASS}" -eq 0 ]; then
   echo 'Batch Quickstart failed: Cannot get correct result for count star 
query.'
   exit 1
 fi
 
-kill -9 $PID
-rm -rf /tmp/PinotAdmin/zkData
-
 # Test quick-start-streaming
 bin/quick-start-streaming.sh &
 PID=$!
 
 PASS=0
 RES_1=0
-sleep 30
 
-for i in $(seq 1 200)
+# Wait for 30 seconds for table to be set up, then at most 5 minutes to reach 
the desired state
+sleep 30
+for i in $(seq 1 150)
 do
-  COUNT_STAR_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from meetupRsvp limit 1","trace":false}' 
http://localhost:8000/query/sql | jq '.resultTable.rows[0][0]'`
- if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]]; then
-    if [ "${COUNT_STAR_RES}" -gt 0 ]; then
+  QUERY_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from meetupRsvp limit 1","trace":false}' 
http://localhost:8000/query/sql`
+  if [ $? -eq 0 ]; then
+    COUNT_STAR_RES=`echo "${QUERY_RES}" | jq '.resultTable.rows[0][0]'`
+    if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]] && [ "${COUNT_STAR_RES}" -gt 0 ]; 
then
       if [ "${RES_1}" -eq 0 ]; then
-        RES_1=${COUNT_STAR_RES}
+        RES_1="${COUNT_STAR_RES}"
         continue
+      elif [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
+        PASS=1
+        break
       fi
     fi
-    if [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
-      PASS=1
-      break
-    fi
   fi
-  sleep 1
+  sleep 2
 done
 
+cleanup "${PID}"
 if [ "${PASS}" -eq 0 ]; then
   if [ "${RES_1}" -eq 0 ]; then
     echo 'Streaming Quickstart test failed: Cannot get correct result for 
count star query.'
@@ -93,9 +108,6 @@ if [ "${PASS}" -eq 0 ]; then
   exit 1
 fi
 
-kill -9 $PID
-rm -rf /tmp/PinotAdmin/zkData
-
 # Test quick-start-hybrid
 cd bin
 ./quick-start-hybrid.sh &
@@ -103,25 +115,28 @@ PID=$!
 
 PASS=0
 RES_1=0
+
+# Wait for 30 seconds for table to be set up, then at most 5 minutes to reach 
the desired state
 sleep 30
-for i in $(seq 1 200)
+for i in $(seq 1 150)
 do
-  COUNT_STAR_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from airlineStats limit 1","trace":false}' 
http://localhost:8000/query/sql | jq '.resultTable.rows[0][0]'`
-  if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]]; then
-    if [ "${COUNT_STAR_RES}" -gt 0 ]; then
+  QUERY_RES=`curl -X POST --header 'Accept: application/json'  -d 
'{"sql":"select count(*) from airlineStats limit 1","trace":false}' 
http://localhost:8000/query/sql`
+  if [ $? -eq 0 ]; then
+    COUNT_STAR_RES=`echo "${QUERY_RES}" | jq '.resultTable.rows[0][0]'`
+    if [[ "${COUNT_STAR_RES}" =~ ^[0-9]+$ ]] && [ "${COUNT_STAR_RES}" -gt 0 ]; 
then
       if [ "${RES_1}" -eq 0 ]; then
-        RES_1=${COUNT_STAR_RES}
+        RES_1="${COUNT_STAR_RES}"
         continue
+      elif [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
+        PASS=1
+        break
       fi
     fi
-    if [ "${COUNT_STAR_RES}" -gt "${RES_1}" ]; then
-      PASS=1
-      break
-    fi
   fi
-  sleep 1
+  sleep 2
 done
 
+cleanup "${PID}"
 if [ "${PASS}" -eq 0 ]; then
   if [ "${RES_1}" -eq 0 ]; then
     echo 'Hybrid Quickstart test failed: Cannot get correct result for count 
star query.'
@@ -131,9 +146,6 @@ if [ "${PASS}" -eq 0 ]; then
   exit 1
 fi
 
-kill -9 $PID
-rm -rf /tmp/PinotAdmin/zkData
-
 cd ../../../../../
 pwd
 mvn clean > /dev/null
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index a59ecf2..bbb204a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -85,12 +85,12 @@ public class HybridQuickstart {
     _dataFile = new File(configDir, "airlineStats_data.avro");
     _realtimeTableConfigFile = new File(configDir, 
"airlineStats_realtime_table_config.json");
 
-    URL resource = Quickstart.class.getClassLoader().getResource(
-        
"examples/stream/airlineStats/airlineStats_realtime_table_config.json");
+    URL resource = Quickstart.class.getClassLoader()
+        
.getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
-    resource = Quickstart.class.getClassLoader().getResource(
-        "examples/stream/airlineStats/sample_data/airlineStats_data.avro");
+    resource = Quickstart.class.getClassLoader()
+        
.getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, _dataFile);
 
@@ -100,7 +100,8 @@ public class HybridQuickstart {
   private void startKafka() {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
     try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
 KafkaStarterUtils.getDefaultKafkaConfiguration());
+      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+          KafkaStarterUtils.getDefaultKafkaConfiguration());
     } catch (Exception e) {
       throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
     }
@@ -122,19 +123,31 @@ public class HybridQuickstart {
         new QuickstartRunner(Lists.newArrayList(offlineRequest, 
realtimeTableRequest), 1, 1, 1, dataDir);
     printStatus(Color.YELLOW, "***** Starting Kafka  *****");
     startKafka();
+    printStatus(Color.YELLOW, "***** Starting airline data stream and 
publishing to Kafka *****");
+    Schema schema = Schema.fromFile(_schemaFile);
+    TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile, 
TableConfig.class);
+    AirlineDataStream stream = new AirlineDataStream(schema, tableConfig, 
_dataFile);
+    stream.run();
     printStatus(Color.YELLOW, "***** Starting Zookeeper, 1 servers, 1 brokers 
and 1 controller *****");
     runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down hybrid quick start 
*****");
+        runner.stop();
+        stream.shutdown();
+        _kafkaStarter.stop();
+        ZkStarter.stopLocalZkServer(_zookeeperInstance);
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
     printStatus(Color.YELLOW, "***** Adding airlineStats offline and realtime 
table *****");
     runner.addTable();
-    printStatus(Color.YELLOW, "***** Launch data ingestion job to build index 
segments for airlineStats and push to controller *****");
+    printStatus(Color.YELLOW,
+        "***** Launch data ingestion job to build index segments for 
airlineStats and push to controller *****");
     runner.launchDataIngestionJob();
 
-    printStatus(Color.YELLOW, "***** Starting airline data stream and 
publishing to Kafka *****");
-    Schema schema = Schema.fromFile(_schemaFile);
-    TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile, 
TableConfig.class);
-    final AirlineDataStream stream = new AirlineDataStream(schema, 
tableConfig, _dataFile);
-    stream.run();
-
     printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is 
complete *****");
     printStatus(Color.YELLOW, "***** Sequence of operations *****");
     printStatus(Color.YELLOW, "*****    1. Started 1 controller instance where 
tenant creation is enabled *****");
@@ -147,8 +160,7 @@ public class HybridQuickstart {
     printStatus(Color.YELLOW, "*****    7. Built and pushed an offline segment 
*****");
     printStatus(Color.YELLOW,
         "*****    8. Started publishing a Kafka stream for the realtime 
instance to start consuming *****");
-    printStatus(Color.YELLOW,
-        "*****    9. Sleep 5 Seconds to wait for all components brought up 
*****");
+    printStatus(Color.YELLOW, "*****    9. Sleep 5 Seconds to wait for all 
components brought up *****");
     Thread.sleep(5000);
 
     String q1 = "select count(*) from airlineStats limit 1";
@@ -182,22 +194,5 @@ public class HybridQuickstart {
     printStatus(Color.GREEN, 
"***************************************************");
 
     printStatus(Color.GREEN, "You can always go to http://localhost:9000/query 
to play around in the query console");
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          printStatus(Color.GREEN, "***** Shutting down hybrid quick start 
*****");
-          stream.shutdown();
-          Thread.sleep(2000);
-          runner.stop();
-          _kafkaStarter.stop();
-          ZkStarter.stopLocalZkServer(_zookeeperInstance);
-          FileUtils.deleteDirectory(quickstartTmpDir);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index e13f7c3..d942a24 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -165,6 +165,15 @@ public class Quickstart {
 
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and 
server *****");
     runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down offline quick start 
*****");
+        runner.stop();
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
     printStatus(Color.CYAN, "***** Adding baseballStats table *****");
     runner.addTable();
     printStatus(Color.CYAN, "***** Launch data ingestion job to build index 
segment for baseballStats and push to controller *****");
@@ -172,19 +181,6 @@ public class Quickstart {
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to 
fetch the assigned segment *****");
     Thread.sleep(5000);
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          printStatus(Color.GREEN, "***** Shutting down offline quick start 
*****");
-          runner.stop();
-          FileUtils.deleteDirectory(quickstartTmpDir);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
-
     printStatus(Color.YELLOW, "***** Offline quickstart setup complete *****");
 
     String q1 = "select count(*) from baseballStats limit 1";
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 48d767f..488a280 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -76,33 +76,28 @@ public class RealtimeQuickStart {
     }
     _kafkaStarter.start();
     _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
-
+    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(schemaFile);
+    meetupRSVPProvider.run();
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
     runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
+        runner.stop();
+        meetupRSVPProvider.stopPublishing();
+        _kafkaStarter.stop();
+        ZkStarter.stopLocalZkServer(zookeeperInstance);
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
     printStatus(Color.CYAN, "***** Adding meetupRSVP table *****");
     runner.addTable();
-    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
-    final MeetupRsvpStream meetupRSVPProvider = new 
MeetupRsvpStream(schemaFile);
-    meetupRSVPProvider.run();
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
     Thread.sleep(5000);
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-          meetupRSVPProvider.stopPublishing();
-          runner.stop();
-          _kafkaStarter.stop();
-          ZkStarter.stopLocalZkServer(zookeeperInstance);
-          FileUtils.deleteDirectory(quickstartTmpDir);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
-
     printStatus(Color.YELLOW, "***** Realtime quickstart setup complete 
*****");
 
     String q1 = "select count(*) from meetupRsvp limit 1";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to