This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ca0b590  Update quickstart and examples (#4970)
ca0b590 is described below

commit ca0b5906cb242269d30308f0ff1fdde73a12bcce
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Jan 11 16:17:43 2020 -0800

    Update quickstart and examples (#4970)
    
    * Move sample_data to examples
    
    * Make travis takes kafka-2.0 into default build
    
    * Make offline quickstart to use launchDataIngestionJob instead of build 
segment then push
    
    * Fixing hybrid quickstart
---
 .travis.yml                                        |   8 +-
 .travis_install.sh                                 |   2 +-
 .travis_test.sh                                    |   2 +-
 docker/images/pinot/README.md                      |   4 +-
 docs/in_production.rst                             |   4 +-
 kubernetes/helm/pinot-realtime-quickstart.yml      |   4 +-
 .../skaffold/gke/pinot-realtime-quickstart.yml     |   4 +-
 pinot-distribution/pinot-assembly.xml              |  20 +--
 .../pinot-stream-ingestion/pinot-kafka-2.0/pom.xml |   4 +-
 .../stream/kafka/KafkaJSONMessageDecoder.java      |   2 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  29 +++--
 .../java/org/apache/pinot/tools/Quickstart.java    |  18 +--
 .../apache/pinot/tools/QuickstartTableRequest.java |  12 +-
 .../org/apache/pinot/tools/RealtimeQuickStart.java |   4 +-
 .../tools/admin/command/QuickstartRunner.java      |  19 +--
 .../admin/command/StreamAvroIntoKafkaCommand.java  |  34 ++++-
 .../pinot/tools/streams/AirlineDataStream.java     |  19 +--
 .../baseballStats_offline_table_config.json        |   0
 .../batch/baseballStats}/baseballStats_schema.json |   0
 .../batch/baseballStats/ingestionJobSpec.yaml      | 139 +++++++++++++++++++++
 .../baseballStats/rawdata}/baseballStats_data.csv  |   0
 .../airlineStats_realtime_table_config.json        |   0
 .../stream/airlineStats}/airlineStats_schema.json  |   0
 .../docker/airlineStats_realtime_table_config.json |   0
 .../airlineStats_realtime_table_config.json        |   1 +
 .../airlineStats_realtime_table_config.json        |   0
 .../sample_data/airlineStats_data.avro             | Bin
 .../sample_data/airlineStats_data.json             |   0
 .../sample_data/airlineStats_data.orc              | Bin
 .../meetupRsvp_realtime_table_config.json          |   3 +-
 .../meetupRsvp_realtime_table_config.json          |   0
 .../meetupRsvp_realtime_table_config.json          |   0
 .../stream/meetupRsvp}/meetupRsvp_schema.json      |   0
 .../airlineStats_offline_table_config.json         |  21 ----
 .../airlineStats_realtime_table_config.json        |  37 ------
 .../docker/airlineStats_offline_table_config.json  |  21 ----
 .../meetupRsvp_realtime_table_config.json          |  28 -----
 pom.xml                                            |  14 ++-
 38 files changed, 266 insertions(+), 187 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 6750717..2fddfe1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -7,13 +7,13 @@ env:
     - JOBS=1
   matrix:
     - RUN_INTEGRATION_TESTS=true
-      KAFKA_VERSION=0.9
-    - RUN_INTEGRATION_TESTS=false
-      KAFKA_VERSION=0.9
-    - RUN_INTEGRATION_TESTS=true
       KAFKA_VERSION=2.0
     - RUN_INTEGRATION_TESTS=false
       KAFKA_VERSION=2.0
+    - RUN_INTEGRATION_TESTS=true
+      KAFKA_VERSION=0.9
+    - RUN_INTEGRATION_TESTS=false
+      KAFKA_VERSION=0.9
 
 jdk:
   - oraclejdk8
diff --git a/.travis_install.sh b/.travis_install.sh
index f6cd3a2..94990bf 100755
--- a/.travis_install.sh
+++ b/.travis_install.sh
@@ -39,7 +39,7 @@ if [ $noThirdEyeChange -eq 0 ]; then
 fi
 
 KAFKA_BUILD_OPTS=""
-if [ "$KAFKA_VERSION" != '0.9' ]; then
+if [ "$KAFKA_VERSION" != '2.0' ]; then
   KAFKA_BUILD_OPTS="-Dkafka.version=${KAFKA_VERSION}"
 fi
 
diff --git a/.travis_test.sh b/.travis_test.sh
index e3d3ad9..3ac82a2 100755
--- a/.travis_test.sh
+++ b/.travis_test.sh
@@ -58,7 +58,7 @@ fi
 passed=0
 
 KAFKA_BUILD_OPTS=""
-if [ "$KAFKA_VERSION" != '0.9' ]; then
+if [ "$KAFKA_VERSION" != '2.0' ]; then
   git diff --name-only $TRAVIS_COMMIT_RANGE | egrep '^(pinot-connectors)'
   if [ $? -ne 0 ]; then
     echo "No Pinot Connector Changes, Skip tests for Kafka Connector: 
${KAFKA_VERSION}."
diff --git a/docker/images/pinot/README.md b/docker/images/pinot/README.md
index 6d97b1f..f18aa79 100644
--- a/docker/images/pinot/README.md
+++ b/docker/images/pinot/README.md
@@ -131,12 +131,12 @@ docker-compose -f docker-compose.yml up
 
 Below is the script to create airlineStats table
 ```SHELL
-docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT AddTable 
-schemaFile sample_data/airlineStats_schema.json -tableConfigFile 
sample_data/docker/airlineStats_realtime_table_config.json -controllerHost 
pinot-controller -controllerPort 9000 -exec
+docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT AddTable 
-schemaFile examples/stream/airlineStats/airlineStats_schema.json 
-tableConfigFile 
examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json 
-controllerHost pinot-controller -controllerPort 9000 -exec
 ```
 
 Below is the script to ingest airplane stats data to Kafka
 ```SHELL
-docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT 
StreamAvroIntoKafka -avroFile sample_data/airlineStats_data.avro -kafkaTopic 
flights-realtime -kafkaBrokerList kafka:9092 -zkAddress zookeeper:2181
+docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT 
StreamAvroIntoKafka -avroFile 
examples/stream/airlineStats/sample_data/airlineStats_data.avro -kafkaTopic 
flights-realtime -kafkaBrokerList kafka:9092 -zkAddress zookeeper:2181
 ```
 
 In order to query pinot, try to open `localhost:9000/query` from your browser.
diff --git a/docs/in_production.rst b/docs/in_production.rst
index 08b0605..2b1e06e 100644
--- a/docs/in_production.rst
+++ b/docs/in_production.rst
@@ -76,8 +76,8 @@ Here is an example of invoking the command to create a pinot 
segment:
 
 .. code-block:: none
 
-  $ 
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/bin/pinot-admin.sh
 CreateSegment -dataDir /Users/host1/Desktop/test/ -format CSV -outDir 
/Users/host1/Desktop/test2/ -tableName baseballStats -segmentName 
baseballStats_data -overwrite -schemaFile 
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/sample_data/baseballStats_schema.json
-  Executing command: CreateSegment  -generatorConfigFile null -dataDir 
/Users/host1/Desktop/test/ -format CSV -outDir /Users/host1/Desktop/test2/ 
-overwrite true -tableName baseballStats -segmentName baseballStats_data 
-timeColumnName null -schemaFile 
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/sample_data/baseballStats_schema.json
 -readerConfigFile null -enableStarTreeIndex false -starTreeIndexSpecFile null 
-hllSize 9 [...]
+  $ 
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/bin/pinot-admin.sh
 CreateSegment -dataDir /Users/host1/Desktop/test/ -format CSV -outDir 
/Users/host1/Desktop/test2/ -tableName baseballStats -segmentName 
baseballStats_data -overwrite -schemaFile 
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/examples/batch/baseballStats/baseballStats_schema.json
+  Executing command: CreateSegment  -generatorConfigFile null -dataDir 
/Users/host1/Desktop/test/ -format CSV -outDir /Users/host1/Desktop/test2/ 
-overwrite true -tableName baseballStats -segmentName baseballStats_data 
-timeColumnName null -schemaFile 
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/examples/batch/baseballStats/baseballStats_schema.json
 -readerConfigFile null -enableStarTreeIndex false -starTreeIndexSpecFil [...]
   Accepted files: [/Users/host1/Desktop/test/baseballStats_data.csv]
   Finished building StatsCollector!
   Collected stats for 97889 documents
diff --git a/kubernetes/helm/pinot-realtime-quickstart.yml 
b/kubernetes/helm/pinot-realtime-quickstart.yml
index 8e60658..e5a613a 100644
--- a/kubernetes/helm/pinot-realtime-quickstart.yml
+++ b/kubernetes/helm/pinot-realtime-quickstart.yml
@@ -497,10 +497,10 @@ spec:
       containers:
         - name: loading-json-data-to-kafka
           image: fx19880617/pinot:0.2.0-SNAPSHOT
-          args: [ "StreamAvroIntoKafka", "-avroFile", 
"sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime", 
"-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181" ]
+          args: [ "StreamAvroIntoKafka", "-avroFile", 
"examples/stream/airlineStats/sample_data/airlineStats_data.avro", 
"-kafkaTopic", "flights-realtime", "-kafkaBrokerList", "kafka:9092", 
"-zkAddress", "kafka-zookeeper:2181" ]
         - name: loading-avro-data-to-kafka
           image: fx19880617/pinot:0.2.0-SNAPSHOT
-          args: [ "StreamAvroIntoKafka", "-avroFile", 
"sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime-avro", 
"-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181", 
"-outputFormat", "avro" ]
+          args: [ "StreamAvroIntoKafka", "-avroFile", 
"examples/stream/airlineStats/sample_data/airlineStats_data.avro", 
"-kafkaTopic", "flights-realtime-avro", "-kafkaBrokerList", "kafka:9092", 
"-zkAddress", "kafka-zookeeper:2181", "-outputFormat", "avro" ]
       restartPolicy: OnFailure
   backoffLimit: 3
 
diff --git a/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml 
b/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
index 7de21bd..cd39c22 100644
--- a/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
+++ b/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
@@ -28,10 +28,10 @@ spec:
       containers:
         - name: loading-data-to-kafka
           image: winedepot/pinot:0.3.0-SNAPSHOT
-          args: [ "StreamAvroIntoKafka", "-avroFile", 
"sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime", 
"-kafkaBrokerList", "kafka:9092", "-zkAddress", "zookeeper:2181" ]
+          args: [ "StreamAvroIntoKafka", "-avroFile", 
"examples/stream/airlineStats/sample_data/airlineStats_data.avro", 
"-kafkaTopic", "flights-realtime", "-kafkaBrokerList", "kafka:9092", 
"-zkAddress", "zookeeper:2181" ]
         - name: pinot-add-example-realtime-table
           image: winedepot/pinot:0.3.0-SNAPSHOT
-          args: [ "AddTable", "-schemaFile", 
"sample_data/airlineStats_schema.json", "-tableConfigFile", 
"sample_data/docker/airlineStats_realtime_table_config.json", 
"-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+          args: [ "AddTable", "-schemaFile", 
"examples/stream/airlineStats/airlineStats_schema.json", "-tableConfigFile", 
"examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json", 
"-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
       restartPolicy: OnFailure
       nodeSelector:
         cloud.google.com/gke-nodepool: default-pool
diff --git a/pinot-distribution/pinot-assembly.xml 
b/pinot-distribution/pinot-assembly.xml
index a95e93b..9e7abcc 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -83,18 +83,13 @@
     </fileSet>
     <fileSet>
       <useDefaultExcludes>false</useDefaultExcludes>
-      
<directory>${pinot.root}/pinot-tools/src/main/resources/sample_data/kafka_${kafka.version}</directory>
-      <outputDirectory>sample_data/</outputDirectory>
+      
<directory>${pinot.root}/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_${kafka.version}</directory>
+      <outputDirectory>examples/stream/meetupRsvp/</outputDirectory>
     </fileSet>
     <fileSet>
       <useDefaultExcludes>false</useDefaultExcludes>
-      
<directory>${pinot.root}/pinot-tools/src/main/resources/sample_data</directory>
-      <outputDirectory>sample_data/</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <useDefaultExcludes>false</useDefaultExcludes>
-      
<directory>${pinot.root}/pinot-tools/src/main/resources/sample_data_realtime</directory>
-      <outputDirectory>sample_data</outputDirectory>
+      
<directory>${pinot.root}/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_${kafka.version}</directory>
+      <outputDirectory>examples/stream/airlineStats/</outputDirectory>
     </fileSet>
     <fileSet>
       <useDefaultExcludes>false</useDefaultExcludes>
@@ -117,11 +112,16 @@
         <exclude>**/pinot-file-system/pinot-file-system/**</exclude>
         <exclude>**/pinot-input-format/pinot-input-format/**</exclude>
         <exclude>**/pinot-stream-ingestion/pinot-stream-ingestion/**</exclude>
-        <exclude>**/pinot-stream-ingestion/pinot-kafka-base/**</exclude>
+        <exclude>**/pinot-stream-ingestion/pinot-kafka-*/**</exclude>
         <exclude>**/pinot-batch-ingestion/pinot-batch-ingestion/**</exclude>
         <exclude>**/pinot-batch-ingestion/pinot-ingestion-common/**</exclude>
         <exclude>**/pinot-batch-ingestion/v0_deprecated/**</exclude>
       </excludes>
     </fileSet>
+    <fileSet>
+      <useDefaultExcludes>false</useDefaultExcludes>
+      
<directory>${pinot.root}/pinot-plugins/target/plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}</directory>
+      
<outputDirectory>plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}</outputDirectory>
+    </fileSet>
   </fileSets>
 </assembly>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
index a484b3a..164c2cd 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
@@ -65,11 +65,11 @@
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
-      <version>2.12.8</version>
+      <version>2.11.11</version>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.12</artifactId>
+      <artifactId>kafka_2.11</artifactId>
       <version>${kafka.lib.version}</version>
       <exclusions>
         <exclusion>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
index 9b0b9d1..cc75d46 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
@@ -61,7 +61,7 @@ public class KafkaJSONMessageDecoder implements 
StreamMessageDecoder<byte[]> {
       }
       return destination;
     } catch (Exception e) {
-      LOGGER.error("Caught exception while decoding row, discarding row.", e);
+      LOGGER.error("Caught exception while decoding row, discarding row. 
Payload is {}", new String(payload), e);
       return null;
     }
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index aa33073..ce18bdc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -45,6 +45,7 @@ public class HybridQuickstart {
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private File _schemaFile;
   private File _dataFile;
+  private File _ingestionJobSpecFile;
 
   private HybridQuickstart() {
   }
@@ -66,21 +67,21 @@ public class HybridQuickstart {
     }
 
     _schemaFile = new File(_offlineQuickStartDataDir, 
"airlineStats_schema.json");
-    _dataFile = new File(_offlineQuickStartDataDir, "airlineStats_data.avro");
+    _ingestionJobSpecFile = new File(_offlineQuickStartDataDir, 
"ingestionJobSpec.yaml");
     File tableConfigFile = new File(_offlineQuickStartDataDir, 
"airlineStats_offline_table_config.json");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("sample_data/airlineStats_schema.json");
+    URL resource = 
classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, _schemaFile);
-    resource = classLoader.getResource("sample_data/airlineStats_data.avro");
+    resource = 
classLoader.getResource("examples/batch/airlineStats/ingestionJobSpec.yaml");
     Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, _dataFile);
-    resource = 
classLoader.getResource("sample_data/airlineStats_offline_table_config.json");
+    FileUtils.copyURLToFile(resource, _ingestionJobSpecFile);
+    resource = 
classLoader.getResource("examples/batch/airlineStats/airlineStats_offline_table_config.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
-    return new QuickstartTableRequest("airlineStats", _schemaFile, 
tableConfigFile, _offlineQuickStartDataDir,
+    return new QuickstartTableRequest("airlineStats", _schemaFile, 
tableConfigFile, _ingestionJobSpecFile, _offlineQuickStartDataDir,
         FileFormat.AVRO);
   }
 
@@ -92,12 +93,17 @@ public class HybridQuickstart {
       Preconditions.checkState(_realtimeQuickStartDataDir.mkdirs());
     }
 
+    _dataFile = new File(_realtimeQuickStartDataDir, "airlineStats_data.avro");
     File tableConfigFile = new File(_realtimeQuickStartDataDir, 
"airlineStats_realtime_table_config.json");
 
     URL resource = Quickstart.class.getClassLoader().getResource(
-        "sample_data/airlineStats_realtime_table_config.json");
+        
"examples/stream/airlineStats/airlineStats_realtime_table_config.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
+    resource = Quickstart.class.getClassLoader().getResource(
+        "examples/stream/airlineStats/sample_data/airlineStats_data.avro");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, _dataFile);
 
     return new QuickstartTableRequest("airlineStats", _schemaFile, 
tableConfigFile);
   }
@@ -110,7 +116,7 @@ public class HybridQuickstart {
       throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
     }
     _kafkaStarter.start();
-    _kafkaStarter.createTopic("airlineStatsEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
+    _kafkaStarter.createTopic("flights-realtime", 
KafkaStarterUtils.getTopicCreationProps(10));
   }
 
   public void execute()
@@ -132,12 +138,11 @@ public class HybridQuickstart {
     runner.createBrokerTenantWith(2, "airline_broker");
     printStatus(Color.YELLOW, "***** Adding airlineStats offline and realtime 
table *****");
     runner.addTable();
-    printStatus(Color.YELLOW, "***** Building index segment for airlineStats 
*****");
-    runner.buildSegment();
-    printStatus(Color.YELLOW, "***** Pushing segment to the controller *****");
-    runner.pushSegment();
+    printStatus(Color.YELLOW, "***** Launch data ingestion job to build index 
segments for airlineStats and push to controller *****");
+    runner.launchDataIngestionJob();
 
     printStatus(Color.YELLOW, "***** Starting airline data stream and 
publishing to Kafka *****");
+
     final AirlineDataStream stream = new 
AirlineDataStream(Schema.fromFile(_schemaFile), _dataFile);
     stream.run();
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index 9427b52..b0662e9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -126,32 +126,34 @@ public class Quickstart {
     File schemaFile = new File(quickStartDataDir, "baseballStats_schema.json");
     File dataFile = new File(quickStartDataDir, "baseballStats_data.csv");
     File tableConfigFile = new File(quickStartDataDir, 
"baseballStats_offline_table_config.json");
+    File ingestionJobSpecFile = new File(quickStartDataDir, 
"ingestionJobSpec.yaml");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("sample_data/baseballStats_schema.json");
+    URL resource = 
classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, schemaFile);
-    resource = classLoader.getResource("sample_data/baseballStats_data.csv");
+    resource = 
classLoader.getResource("examples/batch/baseballStats/rawdata/baseballStats_data.csv");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, dataFile);
-    resource = 
classLoader.getResource("sample_data/baseballStats_offline_table_config.json");
+    resource = 
classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
+    com.google.common.base.Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+    resource = 
classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
     File tempDir = new File("/tmp", 
String.valueOf(System.currentTimeMillis()));
     Preconditions.checkState(tempDir.mkdirs());
     QuickstartTableRequest request =
-        new QuickstartTableRequest("baseballStats", schemaFile, 
tableConfigFile, quickStartDataDir, FileFormat.CSV);
+        new QuickstartTableRequest("baseballStats", schemaFile, 
tableConfigFile, ingestionJobSpecFile, quickStartDataDir, FileFormat.CSV);
     final QuickstartRunner runner = new 
QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir);
 
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and 
server *****");
     runner.startAll();
     printStatus(Color.CYAN, "***** Adding baseballStats table *****");
     runner.addTable();
-    printStatus(Color.CYAN, "***** Building index segment for baseballStats 
*****");
-    runner.buildSegment();
-    printStatus(Color.CYAN, "***** Pushing segment to the controller *****");
-    runner.pushSegment();
+    printStatus(Color.CYAN, "***** Launch data ingestion job to build index 
segment for baseballStats and push to controller *****");
+    runner.launchDataIngestionJob();
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to 
fetch the assigned segment *****");
     Thread.sleep(5000);
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
index 925be5b..9ead3d2 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
@@ -27,12 +27,13 @@ public class QuickstartTableRequest {
 
   File schemaFile;
   File tableRequestFile;
+  File ingestionJobFile;
   File dataDir;
   TableType tableType;
   String tableName;
   FileFormat segmentFileFormat = FileFormat.CSV;
 
-  public QuickstartTableRequest(String tableName, File schemaFile, File 
tableRequest, File dataDir,
+  public QuickstartTableRequest(String tableName, File schemaFile, File 
tableRequest, File ingestionJobFile, File dataDir,
       FileFormat segmentFileFormat) {
     this.tableName = tableName;
     this.schemaFile = schemaFile;
@@ -40,6 +41,7 @@ public class QuickstartTableRequest {
     this.tableRequestFile = tableRequest;
     tableType = TableType.OFFLINE;
     this.segmentFileFormat = segmentFileFormat;
+    this.ingestionJobFile = ingestionJobFile;
   }
 
   public QuickstartTableRequest(String tableName, File schemaFile, File 
tableRequest) {
@@ -73,6 +75,14 @@ public class QuickstartTableRequest {
     this.tableRequestFile = tableRequestFile;
   }
 
+  public File getIngestionJobFile() {
+    return ingestionJobFile;
+  }
+
+  public void setIngestionJobFile(File ingestionJobFile) {
+    this.ingestionJobFile = ingestionJobFile;
+  }
+
   public File getDataDir() {
     return dataDir;
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 82f76e2..c5dc44c 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -60,10 +60,10 @@ public class RealtimeQuickStart {
     File tableConfigFile = new File(quickStartDataDir, 
"meetupRsvp_realtime_table_config.json");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("sample_data/meetupRsvp_schema.json");
+    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, schemaFile);
-    resource = 
classLoader.getResource("sample_data/meetupRsvp_realtime_table_config.json");
+    resource = 
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 549ac52..0e97e24 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.apache.pinot.spi.batch.ingestion.IngestionJobLauncher;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.tools.QuickstartTableRequest;
@@ -162,29 +163,15 @@ public class QuickstartRunner {
     }
   }
 
-  public void buildSegment()
+  public void launchDataIngestionJob()
       throws Exception {
     for (QuickstartTableRequest request : _tableRequests) {
       if (request.getTableType() == TableType.OFFLINE) {
-        File tempDir = new File(_tempDir, request.getTableName() + "_segment");
-        new 
CreateSegmentCommand().setDataDir(request.getDataDir().getAbsolutePath())
-            
.setFormat(request.getSegmentFileFormat()).setSchemaFile(request.getSchemaFile().getAbsolutePath())
-            .setTableName(request.getTableName())
-            .setSegmentName(request.getTableName() + "_" + 
System.currentTimeMillis())
-            .setOutDir(tempDir.getAbsolutePath()).execute();
-        _segmentDirs.add(tempDir.getAbsolutePath());
+        IngestionJobLauncher.main(new 
String[]{request.getIngestionJobFile().getAbsolutePath()});
       }
     }
   }
 
-  public void pushSegment()
-      throws Exception {
-    for (String segmentDir : _segmentDirs) {
-      new 
UploadSegmentCommand().setControllerPort(String.valueOf(_controllerPorts.get(0))).setSegmentDir(segmentDir)
-          .execute();
-    }
-  }
-
   public JsonNode runQuery(String query)
       throws Exception {
     int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 139f108..43600ac 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -31,8 +31,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.tools.Command;
@@ -68,7 +68,6 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
   @Option(name = "-millisBetweenMessages", required = false, metaVar = 
"<int>", usage = "Delay in milliseconds between messages (default 1000 ms)")
   private String _millisBetweenMessages = "1000";
 
-
   @Override
   public boolean getHelp() {
     return _help;
@@ -113,9 +112,11 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
 
     StreamDataProducer streamDataProducer;
     try {
-      streamDataProducer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+      streamDataProducer =
+          
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
     } catch (Exception e) {
-      throw new RuntimeException("Failed to get StreamDataProducer - " + 
KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, e);
+      throw new RuntimeException("Failed to get StreamDataProducer - " + 
KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+          e);
     }
     try {
       // Open the Avro file
@@ -154,4 +155,29 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
     savePID(System.getProperty("java.io.tmpdir") + File.separator + 
".streamAvro.pid");
     return true;
   }
+
+  public StreamAvroIntoKafkaCommand setAvroFile(String avroFile) {
+    _avroFile = avroFile;
+    return this;
+  }
+
+  public StreamAvroIntoKafkaCommand setKafkaBrokerList(String kafkaBrokerList) 
{
+    _kafkaBrokerList = kafkaBrokerList;
+    return this;
+  }
+
+  public StreamAvroIntoKafkaCommand setKafkaTopic(String kafkaTopic) {
+    _kafkaTopic = kafkaTopic;
+    return this;
+  }
+
+  public StreamAvroIntoKafkaCommand setZkAddress(String zkAddress) {
+    _zkAddress = zkAddress;
+    return this;
+  }
+
+  public StreamAvroIntoKafkaCommand setOutputFormat(String outputFormat) {
+    _outputFormat = outputFormat;
+    return this;
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index d2aef53..cf483d3 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.tools.streams;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -28,12 +26,13 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.tools.Quickstart;
@@ -42,6 +41,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * This is used in Hybrid Quickstart.
+ */
 public class AirlineDataStream {
   private static final Logger logger = 
LoggerFactory.getLogger(AirlineDataStream.class);
 
@@ -89,14 +91,14 @@ public class AirlineDataStream {
     avroDataStream = null;
   }
 
-  private void publish(JsonNode message)
+  private void publish(GenericRecord message)
       throws IOException {
     if (!keepIndexing) {
       avroDataStream.close();
       avroDataStream = null;
       return;
     }
-    producer.produce("airlineStatsEvents", 
message.toString().getBytes("UTF-8"));
+    producer.produce("flights-realtime", message.toString().getBytes("UTF-8"));
   }
 
   public void run() {
@@ -112,14 +114,15 @@ public class AirlineDataStream {
             }
 
             GenericRecord record = avroDataStream.next();
-            ObjectNode message = JsonUtils.newObjectNode();
+
+            GenericRecord message = new 
GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(pinotSchema));
 
             for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) {
-              message.set(spec.getName(), 
JsonUtils.objectToJsonNode(record.get(spec.getName())));
+              message.put(spec.getName(), record.get(spec.getName()));
             }
 
             for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) {
-              message.set(spec.getName(), 
JsonUtils.objectToJsonNode(record.get(spec.getName())));
+              message.put(spec.getName(), record.get(spec.getName()));
             }
 
             TimeFieldSpec spec = pinotSchema.getTimeFieldSpec();
diff --git 
a/pinot-tools/src/main/resources/sample_data/baseballStats_offline_table_config.json
 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
similarity index 100%
rename from 
pinot-tools/src/main/resources/sample_data/baseballStats_offline_table_config.json
rename to 
pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
diff --git 
a/pinot-tools/src/main/resources/sample_data/baseballStats_schema.json 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_schema.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/baseballStats_schema.json
rename to 
pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_schema.json
diff --git 
a/pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml
 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml
new file mode 100644
index 0000000..83aca94
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+  # name: execution framework name
+  name: 'standalone'
+
+  # segmentGenerationJobRunnerClassName: class name implements 
org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner 
interface.
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+
+  # segmentTarPushJobRunnerClassName: class name implements 
org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+
+  # segmentUriPushJobRunnerClassName: class name implements 
org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+
+# jobType: Pinot ingestion job type.
+# Supported job types are:
+#   'SegmentCreation'
+#   'SegmentTarPush'
+#   'SegmentUriPush'
+#   'SegmentCreationAndTarPush'
+#   'SegmentCreationAndUriPush'
+jobType: SegmentCreationAndTarPush
+
+# inputDirURI: Root directory of input data, expected to have scheme 
configured in PinotFS.
+inputDirURI: 'examples/batch/baseballStats/rawdata'
+
+# includeFileNamePattern: include file name pattern, supported glob pattern.
+# Sample usage:
+#   'glob:*.avro' will include all avro files just under the inputDirURI, not 
sub directories;
+#   'glob:**\/*.avro' will include all the avro files under inputDirURI 
recursively.
+includeFileNamePattern: 'glob:**/*.csv'
+
+# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
+# Sample usage:
+#   'glob:*.avro' will exclude all avro files just under the inputDirURI, not 
sub directories;
+#   'glob:**\/*.avro' will exclude all the avro files under inputDirURI 
recursively.
+# _excludeFileNamePattern: ''
+
+# outputDirURI: Root directory of output segments, expected to have scheme 
configured in PinotFS.
+outputDirURI: 'examples/batch/baseballStats/segments'
+
+# overwriteOutput: Overwrite output segments if existed.
+overwriteOutput: true
+
+# pinotFSSpecs: defines all related Pinot file systems.
+pinotFSSpecs:
+
+  - # scheme: used to identify a PinotFS.
+    # E.g. local, hdfs, dbfs, etc
+    scheme: file
+
+    # className: Class name used to create the PinotFS instance.
+    # E.g.
+    #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local 
filesystem
+    #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data 
Lake
+    #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+
+# recordReaderSpec: defines all record reader
+recordReaderSpec:
+
+  # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 
'json', 'thrift' etc.
+  dataFormat: 'csv'
+
+  # className: Corresponding RecordReader class name.
+  # E.g.
+  #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
+  #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
+  #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
+  #   org.apache.pinot.plugin.inputformat.json.JsonRecordReader
+  #   org.apache.pinot.plugin.inputformat.orc.OrcRecordReader
+  #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
+  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
+
+  # configClassName: Corresponding RecordReaderConfig class name, it's 
mandatory for CSV and Thrift file format.
+  # E.g.
+  #    org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig
+  #    org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig
+  configClassName: 
'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
+
+  # configs: Used to init RecordReaderConfig class name, this config is 
required for CSV and Thrift data format.
+  configs:
+
+
+# tableSpec: defines table name and where to fetch corresponding table config 
and table schema.
+tableSpec:
+
+  # tableName: Table name
+  tableName: 'baseballStats'
+
+  # schemaURI: defines where to read the table schema, supports PinotFS or 
HTTP.
+  # E.g.
+  #   hdfs://path/to/table_schema.json
+  #   http://localhost:9000/tables/myTable/schema
+  schemaURI: 'http://localhost:9000/tables/baseballStats/schema'
+
+  # tableConfigURI: defines where to reade the table config.
+  # Supports using PinotFS or HTTP.
+  # E.g.
+  #   hdfs://path/to/table_config.json
+  #   http://localhost:9000/tables/myTable
+  # Note that the API to read Pinot table config directly from pinot 
controller contains a JSON wrapper.
+  # The real table config is the object under the field 'OFFLINE'.
+  tableConfigURI: 'http://localhost:9000/tables/baseballStats'
+
+# pinotClusterSpecs: defines the Pinot Cluster Access Point.
+pinotClusterSpecs:
+  - # controllerURI: used to fetch table/schema information and data push.
+    # E.g. http://localhost:9000
+    controllerURI: 'http://localhost:9000'
+
+# pushJobSpec: defines segment push job related configuration.
+pushJobSpec:
+
+  # pushAttempts: number of attempts for push job, default is 1, which means 
no retry.
+  pushAttempts: 2
+
+  # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+  pushRetryIntervalMillis: 1000
diff --git a/pinot-tools/src/main/resources/sample_data/baseballStats_data.csv 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/rawdata/baseballStats_data.csv
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/baseballStats_data.csv
rename to 
pinot-tools/src/main/resources/examples/batch/baseballStats/rawdata/baseballStats_data.csv
diff --git 
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
similarity index 100%
copy from 
pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
copy to 
pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
diff --git 
a/pinot-tools/src/main/resources/sample_data/airlineStats_schema.json 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_schema.json
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
diff --git 
a/pinot-tools/src/main/resources/sample_data/docker/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json
similarity index 100%
rename from 
pinot-tools/src/main/resources/sample_data/docker/airlineStats_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json
diff --git 
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_0.9/airlineStats_realtime_table_config.json
similarity index 91%
rename from 
pinot-tools/src/main/resources/sample_data/kafka_0.9/airlineStats_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_0.9/airlineStats_realtime_table_config.json
index 4ea745e..f5c4652 100644
--- 
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/airlineStats_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_0.9/airlineStats_realtime_table_config.json
@@ -23,6 +23,7 @@
       "stream.kafka.consumer.type": "simple",
       "stream.kafka.topic.name": "flights-realtime",
       "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+      "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory",
       "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
       "stream.kafka.zk.broker.url": "localhost:2191/kafka",
       "stream.kafka.broker.list": "localhost:19092",
diff --git 
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_2.0/airlineStats_realtime_table_config.json
similarity index 100%
rename from 
pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_2.0/airlineStats_realtime_table_config.json
diff --git a/pinot-tools/src/main/resources/sample_data/airlineStats_data.avro 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_data.avro
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
diff --git a/pinot-tools/src/main/resources/sample_data/airlineStats_data.json 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_data.json
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
diff --git a/pinot-tools/src/main/resources/sample_data/airlineStats_data.orc 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_data.orc
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
diff --git 
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
similarity index 87%
rename from 
pinot-tools/src/main/resources/sample_data/kafka_0.9/meetupRsvp_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
index 9b145bc..8d00c37 100644
--- 
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
@@ -19,7 +19,8 @@
       "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
       "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
       "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory",
-      "stream.kafka.zk.broker.url": "localhost:2191/kafka"
+      "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+      "stream.kafka.hlc.bootstrap.server": "localhost:19092"
     }
   },
   "metadata": {
diff --git 
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
similarity index 100%
copy from 
pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
copy to 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
diff --git 
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
similarity index 100%
rename from 
pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
diff --git a/pinot-tools/src/main/resources/sample_data/meetupRsvp_schema.json 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/meetupRsvp_schema.json
rename to 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
diff --git 
a/pinot-tools/src/main/resources/sample_data/airlineStats_offline_table_config.json
 
b/pinot-tools/src/main/resources/sample_data/airlineStats_offline_table_config.json
deleted file mode 100644
index de73b4c..0000000
--- 
a/pinot-tools/src/main/resources/sample_data/airlineStats_offline_table_config.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "tableName": "airlineStats",
-  "tableType": "OFFLINE",
-  "segmentsConfig": {
-    "timeColumnName": "DaysSinceEpoch",
-    "timeType": "DAYS",
-    "segmentPushType": "APPEND",
-    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "replication": "1"
-  },
-  "tenants": {
-    "broker": "airline_broker",
-    "server": "airline"
-  },
-  "tableIndexConfig": {
-    "loadMode": "MMAP"
-  },
-  "metadata": {
-    "customConfigs": {}
-  }
-}
diff --git 
a/pinot-tools/src/main/resources/sample_data/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/sample_data/airlineStats_realtime_table_config.json
deleted file mode 100644
index 4ea745e..0000000
--- 
a/pinot-tools/src/main/resources/sample_data/airlineStats_realtime_table_config.json
+++ /dev/null
@@ -1,37 +0,0 @@
-{
-  "tableName": "airlineStats",
-  "tableType": "REALTIME",
-  "segmentsConfig": {
-    "timeColumnName": "DaysSinceEpoch",
-    "timeType": "DAYS",
-    "retentionTimeUnit": "DAYS",
-    "retentionTimeValue": "5",
-    "segmentPushType": "APPEND",
-    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "schemaName": "airlineStats",
-    "replication": "1",
-    "replicasPerPartition": "1"
-  },
-  "tenants": {
-    "broker": "airline_broker",
-    "server": "airline"
-  },
-  "tableIndexConfig": {
-    "loadMode": "MMAP",
-    "streamConfigs": {
-      "streamType": "kafka",
-      "stream.kafka.consumer.type": "simple",
-      "stream.kafka.topic.name": "flights-realtime",
-      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
-      "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
-      "stream.kafka.zk.broker.url": "localhost:2191/kafka",
-      "stream.kafka.broker.list": "localhost:19092",
-      "realtime.segment.flush.threshold.time": "3600000",
-      "realtime.segment.flush.threshold.size": "50000",
-      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
-    }
-  },
-  "metadata": {
-    "customConfigs": {}
-  }
-}
diff --git 
a/pinot-tools/src/main/resources/sample_data/docker/airlineStats_offline_table_config.json
 
b/pinot-tools/src/main/resources/sample_data/docker/airlineStats_offline_table_config.json
deleted file mode 100644
index de73b4c..0000000
--- 
a/pinot-tools/src/main/resources/sample_data/docker/airlineStats_offline_table_config.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "tableName": "airlineStats",
-  "tableType": "OFFLINE",
-  "segmentsConfig": {
-    "timeColumnName": "DaysSinceEpoch",
-    "timeType": "DAYS",
-    "segmentPushType": "APPEND",
-    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "replication": "1"
-  },
-  "tenants": {
-    "broker": "airline_broker",
-    "server": "airline"
-  },
-  "tableIndexConfig": {
-    "loadMode": "MMAP"
-  },
-  "metadata": {
-    "customConfigs": {}
-  }
-}
diff --git 
a/pinot-tools/src/main/resources/sample_data/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/sample_data/meetupRsvp_realtime_table_config.json
deleted file mode 100644
index 9b145bc..0000000
--- 
a/pinot-tools/src/main/resources/sample_data/meetupRsvp_realtime_table_config.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
-  "tableName": "meetupRsvp",
-  "tableType": "REALTIME",
-  "segmentsConfig": {
-    "timeColumnName": "mtime",
-    "timeType": "MILLISECONDS",
-    "segmentPushType": "APPEND",
-    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "schemaName": "meetupRsvp",
-    "replication": "1"
-  },
-  "tenants": {},
-  "tableIndexConfig": {
-    "loadMode": "MMAP",
-    "streamConfigs": {
-      "streamType": "kafka",
-      "stream.kafka.consumer.type": "highLevel",
-      "stream.kafka.topic.name": "meetupRSVPEvents",
-      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
-      "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
-      "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory",
-      "stream.kafka.zk.broker.url": "localhost:2191/kafka"
-    }
-  },
-  "metadata": {
-    "customConfigs": {}
-  }
-}
diff --git a/pom.xml b/pom.xml
index c152598..e9576c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,11 +137,23 @@
     kafka dependency is still explicitly defined in pinot-integration-tests, 
pinot-tools and pinot-perf pom files.
     To change kafka connector dependency, we only need to update this version 
number config.
     TODO: figure out a way to inject kafka dependency instead of explicitly 
setting the kafka module dependency -->
-    <kafka.version>0.9</kafka.version>
+    <kafka.version>2.0</kafka.version>
   </properties>
 
   <profiles>
     <profile>
+      <id>kafka-0.9</id>
+      <activation>
+        <property>
+          <name>kafka.version</name>
+          <value>0.9</value>
+        </property>
+      </activation>
+      <properties>
+        <kafka.version>0.9</kafka.version>
+      </properties>
+    </profile>
+    <profile>
       <id>kafka-2.0</id>
       <activation>
         <property>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to