This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ca0b590 Update quickstart and examples (#4970)
ca0b590 is described below
commit ca0b5906cb242269d30308f0ff1fdde73a12bcce
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Jan 11 16:17:43 2020 -0800
Update quickstart and examples (#4970)
* Move sample_data to examples
* Make travis takes kafka-2.0 into default build
* Make offline quickstart to use launchDataIngestionJob instead of build
segment then push
* Fixing hybrid quickstart
---
.travis.yml | 8 +-
.travis_install.sh | 2 +-
.travis_test.sh | 2 +-
docker/images/pinot/README.md | 4 +-
docs/in_production.rst | 4 +-
kubernetes/helm/pinot-realtime-quickstart.yml | 4 +-
.../skaffold/gke/pinot-realtime-quickstart.yml | 4 +-
pinot-distribution/pinot-assembly.xml | 20 +--
.../pinot-stream-ingestion/pinot-kafka-2.0/pom.xml | 4 +-
.../stream/kafka/KafkaJSONMessageDecoder.java | 2 +-
.../org/apache/pinot/tools/HybridQuickstart.java | 29 +++--
.../java/org/apache/pinot/tools/Quickstart.java | 18 +--
.../apache/pinot/tools/QuickstartTableRequest.java | 12 +-
.../org/apache/pinot/tools/RealtimeQuickStart.java | 4 +-
.../tools/admin/command/QuickstartRunner.java | 19 +--
.../admin/command/StreamAvroIntoKafkaCommand.java | 34 ++++-
.../pinot/tools/streams/AirlineDataStream.java | 19 +--
.../baseballStats_offline_table_config.json | 0
.../batch/baseballStats}/baseballStats_schema.json | 0
.../batch/baseballStats/ingestionJobSpec.yaml | 139 +++++++++++++++++++++
.../baseballStats/rawdata}/baseballStats_data.csv | 0
.../airlineStats_realtime_table_config.json | 0
.../stream/airlineStats}/airlineStats_schema.json | 0
.../docker/airlineStats_realtime_table_config.json | 0
.../airlineStats_realtime_table_config.json | 1 +
.../airlineStats_realtime_table_config.json | 0
.../sample_data/airlineStats_data.avro | Bin
.../sample_data/airlineStats_data.json | 0
.../sample_data/airlineStats_data.orc | Bin
.../meetupRsvp_realtime_table_config.json | 3 +-
.../meetupRsvp_realtime_table_config.json | 0
.../meetupRsvp_realtime_table_config.json | 0
.../stream/meetupRsvp}/meetupRsvp_schema.json | 0
.../airlineStats_offline_table_config.json | 21 ----
.../airlineStats_realtime_table_config.json | 37 ------
.../docker/airlineStats_offline_table_config.json | 21 ----
.../meetupRsvp_realtime_table_config.json | 28 -----
pom.xml | 14 ++-
38 files changed, 266 insertions(+), 187 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 6750717..2fddfe1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -7,13 +7,13 @@ env:
- JOBS=1
matrix:
- RUN_INTEGRATION_TESTS=true
- KAFKA_VERSION=0.9
- - RUN_INTEGRATION_TESTS=false
- KAFKA_VERSION=0.9
- - RUN_INTEGRATION_TESTS=true
KAFKA_VERSION=2.0
- RUN_INTEGRATION_TESTS=false
KAFKA_VERSION=2.0
+ - RUN_INTEGRATION_TESTS=true
+ KAFKA_VERSION=0.9
+ - RUN_INTEGRATION_TESTS=false
+ KAFKA_VERSION=0.9
jdk:
- oraclejdk8
diff --git a/.travis_install.sh b/.travis_install.sh
index f6cd3a2..94990bf 100755
--- a/.travis_install.sh
+++ b/.travis_install.sh
@@ -39,7 +39,7 @@ if [ $noThirdEyeChange -eq 0 ]; then
fi
KAFKA_BUILD_OPTS=""
-if [ "$KAFKA_VERSION" != '0.9' ]; then
+if [ "$KAFKA_VERSION" != '2.0' ]; then
KAFKA_BUILD_OPTS="-Dkafka.version=${KAFKA_VERSION}"
fi
diff --git a/.travis_test.sh b/.travis_test.sh
index e3d3ad9..3ac82a2 100755
--- a/.travis_test.sh
+++ b/.travis_test.sh
@@ -58,7 +58,7 @@ fi
passed=0
KAFKA_BUILD_OPTS=""
-if [ "$KAFKA_VERSION" != '0.9' ]; then
+if [ "$KAFKA_VERSION" != '2.0' ]; then
git diff --name-only $TRAVIS_COMMIT_RANGE | egrep '^(pinot-connectors)'
if [ $? -ne 0 ]; then
echo "No Pinot Connector Changes, Skip tests for Kafka Connector:
${KAFKA_VERSION}."
diff --git a/docker/images/pinot/README.md b/docker/images/pinot/README.md
index 6d97b1f..f18aa79 100644
--- a/docker/images/pinot/README.md
+++ b/docker/images/pinot/README.md
@@ -131,12 +131,12 @@ docker-compose -f docker-compose.yml up
Below is the script to create airlineStats table
```SHELL
-docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT AddTable
-schemaFile sample_data/airlineStats_schema.json -tableConfigFile
sample_data/docker/airlineStats_realtime_table_config.json -controllerHost
pinot-controller -controllerPort 9000 -exec
+docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT AddTable
-schemaFile examples/stream/airlineStats/airlineStats_schema.json
-tableConfigFile
examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json
-controllerHost pinot-controller -controllerPort 9000 -exec
```
Below is the script to ingest airplane stats data to Kafka
```SHELL
-docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT
StreamAvroIntoKafka -avroFile sample_data/airlineStats_data.avro -kafkaTopic
flights-realtime -kafkaBrokerList kafka:9092 -zkAddress zookeeper:2181
+docker run --network=docker_default winedepot/pinot:0.3.0-SNAPSHOT
StreamAvroIntoKafka -avroFile
examples/stream/airlineStats/sample_data/airlineStats_data.avro -kafkaTopic
flights-realtime -kafkaBrokerList kafka:9092 -zkAddress zookeeper:2181
```
In order to query pinot, try to open `localhost:9000/query` from your browser.
diff --git a/docs/in_production.rst b/docs/in_production.rst
index 08b0605..2b1e06e 100644
--- a/docs/in_production.rst
+++ b/docs/in_production.rst
@@ -76,8 +76,8 @@ Here is an example of invoking the command to create a pinot
segment:
.. code-block:: none
- $
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/bin/pinot-admin.sh
CreateSegment -dataDir /Users/host1/Desktop/test/ -format CSV -outDir
/Users/host1/Desktop/test2/ -tableName baseballStats -segmentName
baseballStats_data -overwrite -schemaFile
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/sample_data/baseballStats_schema.json
- Executing command: CreateSegment -generatorConfigFile null -dataDir
/Users/host1/Desktop/test/ -format CSV -outDir /Users/host1/Desktop/test2/
-overwrite true -tableName baseballStats -segmentName baseballStats_data
-timeColumnName null -schemaFile
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/sample_data/baseballStats_schema.json
-readerConfigFile null -enableStarTreeIndex false -starTreeIndexSpecFile null
-hllSize 9 [...]
+ $
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/bin/pinot-admin.sh
CreateSegment -dataDir /Users/host1/Desktop/test/ -format CSV -outDir
/Users/host1/Desktop/test2/ -tableName baseballStats -segmentName
baseballStats_data -overwrite -schemaFile
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/examples/batch/baseballStats/baseballStats_schema.json
+ Executing command: CreateSegment -generatorConfigFile null -dataDir
/Users/host1/Desktop/test/ -format CSV -outDir /Users/host1/Desktop/test2/
-overwrite true -tableName baseballStats -segmentName baseballStats_data
-timeColumnName null -schemaFile
./pinot-distribution/target/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/apache-pinot-incubating-0.1.0-SNAPSHOT-bin/examples/batch/baseballStats/baseballStats_schema.json
-readerConfigFile null -enableStarTreeIndex false -starTreeIndexSpecFil [...]
Accepted files: [/Users/host1/Desktop/test/baseballStats_data.csv]
Finished building StatsCollector!
Collected stats for 97889 documents
diff --git a/kubernetes/helm/pinot-realtime-quickstart.yml
b/kubernetes/helm/pinot-realtime-quickstart.yml
index 8e60658..e5a613a 100644
--- a/kubernetes/helm/pinot-realtime-quickstart.yml
+++ b/kubernetes/helm/pinot-realtime-quickstart.yml
@@ -497,10 +497,10 @@ spec:
containers:
- name: loading-json-data-to-kafka
image: fx19880617/pinot:0.2.0-SNAPSHOT
- args: [ "StreamAvroIntoKafka", "-avroFile",
"sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime",
"-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181" ]
+ args: [ "StreamAvroIntoKafka", "-avroFile",
"examples/stream/airlineStats/sample_data/airlineStats_data.avro",
"-kafkaTopic", "flights-realtime", "-kafkaBrokerList", "kafka:9092",
"-zkAddress", "kafka-zookeeper:2181" ]
- name: loading-avro-data-to-kafka
image: fx19880617/pinot:0.2.0-SNAPSHOT
- args: [ "StreamAvroIntoKafka", "-avroFile",
"sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime-avro",
"-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181",
"-outputFormat", "avro" ]
+ args: [ "StreamAvroIntoKafka", "-avroFile",
"examples/stream/airlineStats/sample_data/airlineStats_data.avro",
"-kafkaTopic", "flights-realtime-avro", "-kafkaBrokerList", "kafka:9092",
"-zkAddress", "kafka-zookeeper:2181", "-outputFormat", "avro" ]
restartPolicy: OnFailure
backoffLimit: 3
diff --git a/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
b/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
index 7de21bd..cd39c22 100644
--- a/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
+++ b/kubernetes/skaffold/gke/pinot-realtime-quickstart.yml
@@ -28,10 +28,10 @@ spec:
containers:
- name: loading-data-to-kafka
image: winedepot/pinot:0.3.0-SNAPSHOT
- args: [ "StreamAvroIntoKafka", "-avroFile",
"sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime",
"-kafkaBrokerList", "kafka:9092", "-zkAddress", "zookeeper:2181" ]
+ args: [ "StreamAvroIntoKafka", "-avroFile",
"examples/stream/airlineStats/sample_data/airlineStats_data.avro",
"-kafkaTopic", "flights-realtime", "-kafkaBrokerList", "kafka:9092",
"-zkAddress", "zookeeper:2181" ]
- name: pinot-add-example-realtime-table
image: winedepot/pinot:0.3.0-SNAPSHOT
- args: [ "AddTable", "-schemaFile",
"sample_data/airlineStats_schema.json", "-tableConfigFile",
"sample_data/docker/airlineStats_realtime_table_config.json",
"-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+ args: [ "AddTable", "-schemaFile",
"examples/stream/airlineStats/airlineStats_schema.json", "-tableConfigFile",
"examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json",
"-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
restartPolicy: OnFailure
nodeSelector:
cloud.google.com/gke-nodepool: default-pool
diff --git a/pinot-distribution/pinot-assembly.xml
b/pinot-distribution/pinot-assembly.xml
index a95e93b..9e7abcc 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -83,18 +83,13 @@
</fileSet>
<fileSet>
<useDefaultExcludes>false</useDefaultExcludes>
-
<directory>${pinot.root}/pinot-tools/src/main/resources/sample_data/kafka_${kafka.version}</directory>
- <outputDirectory>sample_data/</outputDirectory>
+
<directory>${pinot.root}/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_${kafka.version}</directory>
+ <outputDirectory>examples/stream/meetupRsvp/</outputDirectory>
</fileSet>
<fileSet>
<useDefaultExcludes>false</useDefaultExcludes>
-
<directory>${pinot.root}/pinot-tools/src/main/resources/sample_data</directory>
- <outputDirectory>sample_data/</outputDirectory>
- </fileSet>
- <fileSet>
- <useDefaultExcludes>false</useDefaultExcludes>
-
<directory>${pinot.root}/pinot-tools/src/main/resources/sample_data_realtime</directory>
- <outputDirectory>sample_data</outputDirectory>
+
<directory>${pinot.root}/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_${kafka.version}</directory>
+ <outputDirectory>examples/stream/airlineStats/</outputDirectory>
</fileSet>
<fileSet>
<useDefaultExcludes>false</useDefaultExcludes>
@@ -117,11 +112,16 @@
<exclude>**/pinot-file-system/pinot-file-system/**</exclude>
<exclude>**/pinot-input-format/pinot-input-format/**</exclude>
<exclude>**/pinot-stream-ingestion/pinot-stream-ingestion/**</exclude>
- <exclude>**/pinot-stream-ingestion/pinot-kafka-base/**</exclude>
+ <exclude>**/pinot-stream-ingestion/pinot-kafka-*/**</exclude>
<exclude>**/pinot-batch-ingestion/pinot-batch-ingestion/**</exclude>
<exclude>**/pinot-batch-ingestion/pinot-ingestion-common/**</exclude>
<exclude>**/pinot-batch-ingestion/v0_deprecated/**</exclude>
</excludes>
</fileSet>
+ <fileSet>
+ <useDefaultExcludes>false</useDefaultExcludes>
+
<directory>${pinot.root}/pinot-plugins/target/plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}</directory>
+
<outputDirectory>plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}</outputDirectory>
+ </fileSet>
</fileSets>
</assembly>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
index a484b3a..164c2cd 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
@@ -65,11 +65,11 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.12.8</version>
+ <version>2.11.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
+ <artifactId>kafka_2.11</artifactId>
<version>${kafka.lib.version}</version>
<exclusions>
<exclusion>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
index 9b0b9d1..cc75d46 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
@@ -61,7 +61,7 @@ public class KafkaJSONMessageDecoder implements
StreamMessageDecoder<byte[]> {
}
return destination;
} catch (Exception e) {
- LOGGER.error("Caught exception while decoding row, discarding row.", e);
+ LOGGER.error("Caught exception while decoding row, discarding row.
Payload is {}", new String(payload), e);
return null;
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index aa33073..ce18bdc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -45,6 +45,7 @@ public class HybridQuickstart {
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private File _schemaFile;
private File _dataFile;
+ private File _ingestionJobSpecFile;
private HybridQuickstart() {
}
@@ -66,21 +67,21 @@ public class HybridQuickstart {
}
_schemaFile = new File(_offlineQuickStartDataDir,
"airlineStats_schema.json");
- _dataFile = new File(_offlineQuickStartDataDir, "airlineStats_data.avro");
+ _ingestionJobSpecFile = new File(_offlineQuickStartDataDir,
"ingestionJobSpec.yaml");
File tableConfigFile = new File(_offlineQuickStartDataDir,
"airlineStats_offline_table_config.json");
ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("sample_data/airlineStats_schema.json");
+ URL resource =
classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, _schemaFile);
- resource = classLoader.getResource("sample_data/airlineStats_data.avro");
+ resource =
classLoader.getResource("examples/batch/airlineStats/ingestionJobSpec.yaml");
Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, _dataFile);
- resource =
classLoader.getResource("sample_data/airlineStats_offline_table_config.json");
+ FileUtils.copyURLToFile(resource, _ingestionJobSpecFile);
+ resource =
classLoader.getResource("examples/batch/airlineStats/airlineStats_offline_table_config.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
- return new QuickstartTableRequest("airlineStats", _schemaFile,
tableConfigFile, _offlineQuickStartDataDir,
+ return new QuickstartTableRequest("airlineStats", _schemaFile,
tableConfigFile, _ingestionJobSpecFile, _offlineQuickStartDataDir,
FileFormat.AVRO);
}
@@ -92,12 +93,17 @@ public class HybridQuickstart {
Preconditions.checkState(_realtimeQuickStartDataDir.mkdirs());
}
+ _dataFile = new File(_realtimeQuickStartDataDir, "airlineStats_data.avro");
File tableConfigFile = new File(_realtimeQuickStartDataDir,
"airlineStats_realtime_table_config.json");
URL resource = Quickstart.class.getClassLoader().getResource(
- "sample_data/airlineStats_realtime_table_config.json");
+
"examples/stream/airlineStats/airlineStats_realtime_table_config.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
+ resource = Quickstart.class.getClassLoader().getResource(
+ "examples/stream/airlineStats/sample_data/airlineStats_data.avro");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, _dataFile);
return new QuickstartTableRequest("airlineStats", _schemaFile,
tableConfigFile);
}
@@ -110,7 +116,7 @@ public class HybridQuickstart {
throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
}
_kafkaStarter.start();
- _kafkaStarter.createTopic("airlineStatsEvents",
KafkaStarterUtils.getTopicCreationProps(10));
+ _kafkaStarter.createTopic("flights-realtime",
KafkaStarterUtils.getTopicCreationProps(10));
}
public void execute()
@@ -132,12 +138,11 @@ public class HybridQuickstart {
runner.createBrokerTenantWith(2, "airline_broker");
printStatus(Color.YELLOW, "***** Adding airlineStats offline and realtime
table *****");
runner.addTable();
- printStatus(Color.YELLOW, "***** Building index segment for airlineStats
*****");
- runner.buildSegment();
- printStatus(Color.YELLOW, "***** Pushing segment to the controller *****");
- runner.pushSegment();
+ printStatus(Color.YELLOW, "***** Launch data ingestion job to build index
segments for airlineStats and push to controller *****");
+ runner.launchDataIngestionJob();
printStatus(Color.YELLOW, "***** Starting airline data stream and
publishing to Kafka *****");
+
final AirlineDataStream stream = new
AirlineDataStream(Schema.fromFile(_schemaFile), _dataFile);
stream.run();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index 9427b52..b0662e9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -126,32 +126,34 @@ public class Quickstart {
File schemaFile = new File(quickStartDataDir, "baseballStats_schema.json");
File dataFile = new File(quickStartDataDir, "baseballStats_data.csv");
File tableConfigFile = new File(quickStartDataDir,
"baseballStats_offline_table_config.json");
+ File ingestionJobSpecFile = new File(quickStartDataDir,
"ingestionJobSpec.yaml");
ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("sample_data/baseballStats_schema.json");
+ URL resource =
classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, schemaFile);
- resource = classLoader.getResource("sample_data/baseballStats_data.csv");
+ resource =
classLoader.getResource("examples/batch/baseballStats/rawdata/baseballStats_data.csv");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, dataFile);
- resource =
classLoader.getResource("sample_data/baseballStats_offline_table_config.json");
+ resource =
classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
+ com.google.common.base.Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+ resource =
classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
File tempDir = new File("/tmp",
String.valueOf(System.currentTimeMillis()));
Preconditions.checkState(tempDir.mkdirs());
QuickstartTableRequest request =
- new QuickstartTableRequest("baseballStats", schemaFile,
tableConfigFile, quickStartDataDir, FileFormat.CSV);
+ new QuickstartTableRequest("baseballStats", schemaFile,
tableConfigFile, ingestionJobSpecFile, quickStartDataDir, FileFormat.CSV);
final QuickstartRunner runner = new
QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir);
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and
server *****");
runner.startAll();
printStatus(Color.CYAN, "***** Adding baseballStats table *****");
runner.addTable();
- printStatus(Color.CYAN, "***** Building index segment for baseballStats
*****");
- runner.buildSegment();
- printStatus(Color.CYAN, "***** Pushing segment to the controller *****");
- runner.pushSegment();
+ printStatus(Color.CYAN, "***** Launch data ingestion job to build index
segment for baseballStats and push to controller *****");
+ runner.launchDataIngestionJob();
printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to
fetch the assigned segment *****");
Thread.sleep(5000);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
index 925be5b..9ead3d2 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
@@ -27,12 +27,13 @@ public class QuickstartTableRequest {
File schemaFile;
File tableRequestFile;
+ File ingestionJobFile;
File dataDir;
TableType tableType;
String tableName;
FileFormat segmentFileFormat = FileFormat.CSV;
- public QuickstartTableRequest(String tableName, File schemaFile, File
tableRequest, File dataDir,
+ public QuickstartTableRequest(String tableName, File schemaFile, File
tableRequest, File ingestionJobFile, File dataDir,
FileFormat segmentFileFormat) {
this.tableName = tableName;
this.schemaFile = schemaFile;
@@ -40,6 +41,7 @@ public class QuickstartTableRequest {
this.tableRequestFile = tableRequest;
tableType = TableType.OFFLINE;
this.segmentFileFormat = segmentFileFormat;
+ this.ingestionJobFile = ingestionJobFile;
}
public QuickstartTableRequest(String tableName, File schemaFile, File
tableRequest) {
@@ -73,6 +75,14 @@ public class QuickstartTableRequest {
this.tableRequestFile = tableRequestFile;
}
+ public File getIngestionJobFile() {
+ return ingestionJobFile;
+ }
+
+ public void setIngestionJobFile(File ingestionJobFile) {
+ this.ingestionJobFile = ingestionJobFile;
+ }
+
public File getDataDir() {
return dataDir;
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 82f76e2..c5dc44c 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -60,10 +60,10 @@ public class RealtimeQuickStart {
File tableConfigFile = new File(quickStartDataDir,
"meetupRsvp_realtime_table_config.json");
ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("sample_data/meetupRsvp_schema.json");
+ URL resource =
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, schemaFile);
- resource =
classLoader.getResource("sample_data/meetupRsvp_realtime_table_config.json");
+ resource =
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 549ac52..0e97e24 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.apache.pinot.spi.batch.ingestion.IngestionJobLauncher;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.common.utils.TenantRole;
import org.apache.pinot.tools.QuickstartTableRequest;
@@ -162,29 +163,15 @@ public class QuickstartRunner {
}
}
- public void buildSegment()
+ public void launchDataIngestionJob()
throws Exception {
for (QuickstartTableRequest request : _tableRequests) {
if (request.getTableType() == TableType.OFFLINE) {
- File tempDir = new File(_tempDir, request.getTableName() + "_segment");
- new
CreateSegmentCommand().setDataDir(request.getDataDir().getAbsolutePath())
-
.setFormat(request.getSegmentFileFormat()).setSchemaFile(request.getSchemaFile().getAbsolutePath())
- .setTableName(request.getTableName())
- .setSegmentName(request.getTableName() + "_" +
System.currentTimeMillis())
- .setOutDir(tempDir.getAbsolutePath()).execute();
- _segmentDirs.add(tempDir.getAbsolutePath());
+ IngestionJobLauncher.main(new
String[]{request.getIngestionJobFile().getAbsolutePath()});
}
}
}
- public void pushSegment()
- throws Exception {
- for (String segmentDir : _segmentDirs) {
- new
UploadSegmentCommand().setControllerPort(String.valueOf(_controllerPorts.get(0))).setSegmentDir(segmentDir)
- .execute();
- }
- }
-
public JsonNode runQuery(String query)
throws Exception {
int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 139f108..43600ac 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -31,8 +31,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.tools.Command;
@@ -68,7 +68,6 @@ public class StreamAvroIntoKafkaCommand extends
AbstractBaseAdminCommand impleme
@Option(name = "-millisBetweenMessages", required = false, metaVar =
"<int>", usage = "Delay in milliseconds between messages (default 1000 ms)")
private String _millisBetweenMessages = "1000";
-
@Override
public boolean getHelp() {
return _help;
@@ -113,9 +112,11 @@ public class StreamAvroIntoKafkaCommand extends
AbstractBaseAdminCommand impleme
StreamDataProducer streamDataProducer;
try {
- streamDataProducer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
+ streamDataProducer =
+
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
} catch (Exception e) {
- throw new RuntimeException("Failed to get StreamDataProducer - " +
KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, e);
+ throw new RuntimeException("Failed to get StreamDataProducer - " +
KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+ e);
}
try {
// Open the Avro file
@@ -154,4 +155,29 @@ public class StreamAvroIntoKafkaCommand extends
AbstractBaseAdminCommand impleme
savePID(System.getProperty("java.io.tmpdir") + File.separator +
".streamAvro.pid");
return true;
}
+
+ public StreamAvroIntoKafkaCommand setAvroFile(String avroFile) {
+ _avroFile = avroFile;
+ return this;
+ }
+
+ public StreamAvroIntoKafkaCommand setKafkaBrokerList(String kafkaBrokerList)
{
+ _kafkaBrokerList = kafkaBrokerList;
+ return this;
+ }
+
+ public StreamAvroIntoKafkaCommand setKafkaTopic(String kafkaTopic) {
+ _kafkaTopic = kafkaTopic;
+ return this;
+ }
+
+ public StreamAvroIntoKafkaCommand setZkAddress(String zkAddress) {
+ _zkAddress = zkAddress;
+ return this;
+ }
+
+ public StreamAvroIntoKafkaCommand setOutputFormat(String outputFormat) {
+ _outputFormat = outputFormat;
+ return this;
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index d2aef53..cf483d3 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.tools.streams;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -28,12 +26,13 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.tools.Quickstart;
@@ -42,6 +41,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * This is used in Hybrid Quickstart.
+ */
public class AirlineDataStream {
private static final Logger logger =
LoggerFactory.getLogger(AirlineDataStream.class);
@@ -89,14 +91,14 @@ public class AirlineDataStream {
avroDataStream = null;
}
- private void publish(JsonNode message)
+ private void publish(GenericRecord message)
throws IOException {
if (!keepIndexing) {
avroDataStream.close();
avroDataStream = null;
return;
}
- producer.produce("airlineStatsEvents",
message.toString().getBytes("UTF-8"));
+ producer.produce("flights-realtime", message.toString().getBytes("UTF-8"));
}
public void run() {
@@ -112,14 +114,15 @@ public class AirlineDataStream {
}
GenericRecord record = avroDataStream.next();
- ObjectNode message = JsonUtils.newObjectNode();
+
+ GenericRecord message = new
GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(pinotSchema));
for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) {
- message.set(spec.getName(),
JsonUtils.objectToJsonNode(record.get(spec.getName())));
+ message.put(spec.getName(), record.get(spec.getName()));
}
for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) {
- message.set(spec.getName(),
JsonUtils.objectToJsonNode(record.get(spec.getName())));
+ message.put(spec.getName(), record.get(spec.getName()));
}
TimeFieldSpec spec = pinotSchema.getTimeFieldSpec();
diff --git
a/pinot-tools/src/main/resources/sample_data/baseballStats_offline_table_config.json
b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
similarity index 100%
rename from
pinot-tools/src/main/resources/sample_data/baseballStats_offline_table_config.json
rename to
pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
diff --git
a/pinot-tools/src/main/resources/sample_data/baseballStats_schema.json
b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_schema.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/baseballStats_schema.json
rename to
pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_schema.json
diff --git
a/pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml
b/pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml
new file mode 100644
index 0000000..83aca94
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+ # name: execution framework name
+ name: 'standalone'
+
+ # segmentGenerationJobRunnerClassName: class name implements
org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner
interface.
+ segmentGenerationJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+
+ # segmentTarPushJobRunnerClassName: class name implements
org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
+ segmentTarPushJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+
+ # segmentUriPushJobRunnerClassName: class name implements
org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
+ segmentUriPushJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+
+# jobType: Pinot ingestion job type.
+# Supported job types are:
+# 'SegmentCreation'
+# 'SegmentTarPush'
+# 'SegmentUriPush'
+# 'SegmentCreationAndTarPush'
+# 'SegmentCreationAndUriPush'
+jobType: SegmentCreationAndTarPush
+
+# inputDirURI: Root directory of input data, expected to have scheme
configured in PinotFS.
+inputDirURI: 'examples/batch/baseballStats/rawdata'
+
+# includeFileNamePattern: include file name pattern, supported glob pattern.
+# Sample usage:
+# 'glob:*.avro' will include all avro files just under the inputDirURI, not
sub directories;
+# 'glob:**\/*.avro' will include all the avro files under inputDirURI
recursively.
+includeFileNamePattern: 'glob:**/*.csv'
+
+# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
+# Sample usage:
+# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not
sub directories;
+# 'glob:**\/*.avro' will exclude all the avro files under inputDirURI
recursively.
+# _excludeFileNamePattern: ''
+
+# outputDirURI: Root directory of output segments, expected to have scheme
configured in PinotFS.
+outputDirURI: 'examples/batch/baseballStats/segments'
+
+# overwriteOutput: Overwrite output segments if existed.
+overwriteOutput: true
+
+# pinotFSSpecs: defines all related Pinot file systems.
+pinotFSSpecs:
+
+ - # scheme: used to identify a PinotFS.
+ # E.g. local, hdfs, dbfs, etc
+ scheme: file
+
+ # className: Class name used to create the PinotFS instance.
+ # E.g.
+ # org.apache.pinot.spi.filesystem.LocalPinotFS is used for local
filesystem
+ # org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data
Lake
+ # org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
+ className: org.apache.pinot.spi.filesystem.LocalPinotFS
+
+# recordReaderSpec: defines all record reader
+recordReaderSpec:
+
+ # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv',
'json', 'thrift' etc.
+ dataFormat: 'csv'
+
+ # className: Corresponding RecordReader class name.
+ # E.g.
+ # org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
+ # org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
+ # org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
+ # org.apache.pinot.plugin.inputformat.json.JsonRecordReader
+ # org.apache.pinot.plugin.inputformat.orc.OrcRecordReader
+ # org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
+ className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
+
+ # configClassName: Corresponding RecordReaderConfig class name, it's
mandatory for CSV and Thrift file format.
+ # E.g.
+ # org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig
+ # org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig
+ configClassName:
'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
+
+ # configs: Used to init RecordReaderConfig class name, this config is
required for CSV and Thrift data format.
+ configs:
+
+
+# tableSpec: defines table name and where to fetch corresponding table config
and table schema.
+tableSpec:
+
+ # tableName: Table name
+ tableName: 'baseballStats'
+
+ # schemaURI: defines where to read the table schema, supports PinotFS or
HTTP.
+ # E.g.
+ # hdfs://path/to/table_schema.json
+ # http://localhost:9000/tables/myTable/schema
+ schemaURI: 'http://localhost:9000/tables/baseballStats/schema'
+
+ # tableConfigURI: defines where to reade the table config.
+ # Supports using PinotFS or HTTP.
+ # E.g.
+ # hdfs://path/to/table_config.json
+ # http://localhost:9000/tables/myTable
+ # Note that the API to read Pinot table config directly from pinot
controller contains a JSON wrapper.
+ # The real table config is the object under the field 'OFFLINE'.
+ tableConfigURI: 'http://localhost:9000/tables/baseballStats'
+
+# pinotClusterSpecs: defines the Pinot Cluster Access Point.
+pinotClusterSpecs:
+ - # controllerURI: used to fetch table/schema information and data push.
+ # E.g. http://localhost:9000
+ controllerURI: 'http://localhost:9000'
+
+# pushJobSpec: defines segment push job related configuration.
+pushJobSpec:
+
+ # pushAttempts: number of attempts for push job, default is 1, which means
no retry.
+ pushAttempts: 2
+
+ # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+ pushRetryIntervalMillis: 1000
diff --git a/pinot-tools/src/main/resources/sample_data/baseballStats_data.csv
b/pinot-tools/src/main/resources/examples/batch/baseballStats/rawdata/baseballStats_data.csv
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/baseballStats_data.csv
rename to
pinot-tools/src/main/resources/examples/batch/baseballStats/rawdata/baseballStats_data.csv
diff --git
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
similarity index 100%
copy from
pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
copy to
pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
diff --git
a/pinot-tools/src/main/resources/sample_data/airlineStats_schema.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_schema.json
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
diff --git
a/pinot-tools/src/main/resources/sample_data/docker/airlineStats_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json
similarity index 100%
rename from
pinot-tools/src/main/resources/sample_data/docker/airlineStats_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/docker/airlineStats_realtime_table_config.json
diff --git
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/airlineStats_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_0.9/airlineStats_realtime_table_config.json
similarity index 91%
rename from
pinot-tools/src/main/resources/sample_data/kafka_0.9/airlineStats_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_0.9/airlineStats_realtime_table_config.json
index 4ea745e..f5c4652 100644
---
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/airlineStats_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_0.9/airlineStats_realtime_table_config.json
@@ -23,6 +23,7 @@
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "flights-realtime",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092",
diff --git
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_2.0/airlineStats_realtime_table_config.json
similarity index 100%
rename from
pinot-tools/src/main/resources/sample_data/kafka_2.0/airlineStats_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/kafka_2.0/airlineStats_realtime_table_config.json
diff --git a/pinot-tools/src/main/resources/sample_data/airlineStats_data.avro
b/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_data.avro
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
diff --git a/pinot-tools/src/main/resources/sample_data/airlineStats_data.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_data.json
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
diff --git a/pinot-tools/src/main/resources/sample_data/airlineStats_data.orc
b/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/airlineStats_data.orc
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
diff --git
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
similarity index 87%
rename from
pinot-tools/src/main/resources/sample_data/kafka_0.9/meetupRsvp_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
index 9b145bc..8d00c37 100644
---
a/pinot-tools/src/main/resources/sample_data/kafka_0.9/meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
@@ -19,7 +19,8 @@
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory",
- "stream.kafka.zk.broker.url": "localhost:2191/kafka"
+ "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+ "stream.kafka.hlc.bootstrap.server": "localhost:19092"
}
},
"metadata": {
diff --git
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
similarity index 100%
copy from
pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
copy to
pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
diff --git
a/pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
similarity index 100%
rename from
pinot-tools/src/main/resources/sample_data/kafka_2.0/meetupRsvp_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
diff --git a/pinot-tools/src/main/resources/sample_data/meetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
similarity index 100%
rename from pinot-tools/src/main/resources/sample_data/meetupRsvp_schema.json
rename to
pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
diff --git
a/pinot-tools/src/main/resources/sample_data/airlineStats_offline_table_config.json
b/pinot-tools/src/main/resources/sample_data/airlineStats_offline_table_config.json
deleted file mode 100644
index de73b4c..0000000
---
a/pinot-tools/src/main/resources/sample_data/airlineStats_offline_table_config.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "tableName": "airlineStats",
- "tableType": "OFFLINE",
- "segmentsConfig": {
- "timeColumnName": "DaysSinceEpoch",
- "timeType": "DAYS",
- "segmentPushType": "APPEND",
- "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "replication": "1"
- },
- "tenants": {
- "broker": "airline_broker",
- "server": "airline"
- },
- "tableIndexConfig": {
- "loadMode": "MMAP"
- },
- "metadata": {
- "customConfigs": {}
- }
-}
diff --git
a/pinot-tools/src/main/resources/sample_data/airlineStats_realtime_table_config.json
b/pinot-tools/src/main/resources/sample_data/airlineStats_realtime_table_config.json
deleted file mode 100644
index 4ea745e..0000000
---
a/pinot-tools/src/main/resources/sample_data/airlineStats_realtime_table_config.json
+++ /dev/null
@@ -1,37 +0,0 @@
-{
- "tableName": "airlineStats",
- "tableType": "REALTIME",
- "segmentsConfig": {
- "timeColumnName": "DaysSinceEpoch",
- "timeType": "DAYS",
- "retentionTimeUnit": "DAYS",
- "retentionTimeValue": "5",
- "segmentPushType": "APPEND",
- "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "schemaName": "airlineStats",
- "replication": "1",
- "replicasPerPartition": "1"
- },
- "tenants": {
- "broker": "airline_broker",
- "server": "airline"
- },
- "tableIndexConfig": {
- "loadMode": "MMAP",
- "streamConfigs": {
- "streamType": "kafka",
- "stream.kafka.consumer.type": "simple",
- "stream.kafka.topic.name": "flights-realtime",
- "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
- "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
- "stream.kafka.zk.broker.url": "localhost:2191/kafka",
- "stream.kafka.broker.list": "localhost:19092",
- "realtime.segment.flush.threshold.time": "3600000",
- "realtime.segment.flush.threshold.size": "50000",
- "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
- }
- },
- "metadata": {
- "customConfigs": {}
- }
-}
diff --git
a/pinot-tools/src/main/resources/sample_data/docker/airlineStats_offline_table_config.json
b/pinot-tools/src/main/resources/sample_data/docker/airlineStats_offline_table_config.json
deleted file mode 100644
index de73b4c..0000000
---
a/pinot-tools/src/main/resources/sample_data/docker/airlineStats_offline_table_config.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "tableName": "airlineStats",
- "tableType": "OFFLINE",
- "segmentsConfig": {
- "timeColumnName": "DaysSinceEpoch",
- "timeType": "DAYS",
- "segmentPushType": "APPEND",
- "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "replication": "1"
- },
- "tenants": {
- "broker": "airline_broker",
- "server": "airline"
- },
- "tableIndexConfig": {
- "loadMode": "MMAP"
- },
- "metadata": {
- "customConfigs": {}
- }
-}
diff --git
a/pinot-tools/src/main/resources/sample_data/meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/sample_data/meetupRsvp_realtime_table_config.json
deleted file mode 100644
index 9b145bc..0000000
---
a/pinot-tools/src/main/resources/sample_data/meetupRsvp_realtime_table_config.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "tableName": "meetupRsvp",
- "tableType": "REALTIME",
- "segmentsConfig": {
- "timeColumnName": "mtime",
- "timeType": "MILLISECONDS",
- "segmentPushType": "APPEND",
- "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "schemaName": "meetupRsvp",
- "replication": "1"
- },
- "tenants": {},
- "tableIndexConfig": {
- "loadMode": "MMAP",
- "streamConfigs": {
- "streamType": "kafka",
- "stream.kafka.consumer.type": "highLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
- "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
- "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
- "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory",
- "stream.kafka.zk.broker.url": "localhost:2191/kafka"
- }
- },
- "metadata": {
- "customConfigs": {}
- }
-}
diff --git a/pom.xml b/pom.xml
index c152598..e9576c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,11 +137,23 @@
kafka dependency is still explicitly defined in pinot-integration-tests,
pinot-tools and pinot-perf pom files.
To change kafka connector dependency, we only need to update this version
number config.
TODO: figure out a way to inject kafka dependency instead of explicitly
setting the kafka module dependency -->
- <kafka.version>0.9</kafka.version>
+ <kafka.version>2.0</kafka.version>
</properties>
<profiles>
<profile>
+ <id>kafka-0.9</id>
+ <activation>
+ <property>
+ <name>kafka.version</name>
+ <value>0.9</value>
+ </property>
+ </activation>
+ <properties>
+ <kafka.version>0.9</kafka.version>
+ </properties>
+ </profile>
+ <profile>
<id>kafka-2.0</id>
<activation>
<property>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]